SMTP send_email real implementation

darcs-hash:20070618175813-86b55-990d6242426508e186abfc454d69fc9e0cd424b0.gz
This commit is contained in:
David Rousselie
2007-06-18 19:58:13 +02:00
parent d6b91f8d02
commit 86a1367f26
5 changed files with 314 additions and 79 deletions

View File

@@ -243,9 +243,11 @@ class SendMailMessageHandler(MailHandler):
def handle(self, message, lang_class, accounts): def handle(self, message, lang_class, accounts):
to_node = message.get_to().node to_node = message.get_to().node
to_email = to_node.replace('%', '@', 1) to_email = to_node.replace('%', '@', 1)
accounts[0].send_email(to_email, accounts[0].send_email(\
message.get_subject(), accounts[0].create_email(accounts[0].default_from,
message.get_body()) to_email,
message.get_subject(),
message.get_body()))
return self.send_mail_result(message, lang_class, to_email) return self.send_mail_result(message, lang_class, to_email)
class RootSendMailMessageHandler(SendMailMessageHandler): class RootSendMailMessageHandler(SendMailMessageHandler):
@@ -253,7 +255,7 @@ class RootSendMailMessageHandler(SendMailMessageHandler):
def __init__(self): def __init__(self):
SendMailMessageHandler.__init__(self) SendMailMessageHandler.__init__(self)
self.to_regexp = re.compile("^\s*(to|TO)\s*:\s*(?P<to_email>.*)") self.to_regexp = re.compile("^\s*(to|TO|To)\s*:\s*(?P<to_email>.*)")
self.__logger = logging.getLogger(\ self.__logger = logging.getLogger(\
"jmc.jabber.component.RootSendMailMessageHandler") "jmc.jabber.component.RootSendMailMessageHandler")
@@ -285,8 +287,11 @@ class RootSendMailMessageHandler(SendMailMessageHandler):
message_body.append(line) message_body.append(line)
message_body.extend(lines) message_body.extend(lines)
if to_email is not None: if to_email is not None:
accounts[0].send_email(to_email, message.get_subject(), accounts[0].send_email(\
"\n".join(message_body)) accounts[0].create_email(accounts[0].default_from,
to_email,
message.get_subject(),
"\n".join(message_body)))
return self.send_mail_result(message, lang_class, to_email) return self.send_mail_result(message, lang_class, to_email)
else: else:
return [Message(from_jid=message.get_to(), return [Message(from_jid=message.get_to(),

View File

@@ -142,6 +142,16 @@ class MockPOP3Account(MockMailAccount, POP3Account):
IMAPAccount._init(self, *args, **kw) IMAPAccount._init(self, *args, **kw)
MockMailAccount._init(self) MockMailAccount._init(self)
class MockSMTPAccount(object):
def __init__(self):
self.default_from = "user1@test.com"
def create_email(self, from_email, to_email, subject, body):
return (from_email, to_email, subject, body)
def send_email(self, email):
self.email = email
class MailComponent_TestCase(unittest.TestCase): class MailComponent_TestCase(unittest.TestCase):
def setUp(self): def setUp(self):
if os.path.exists(DB_PATH): if os.path.exists(DB_PATH):
@@ -525,16 +535,11 @@ class SendMailMessageHandler_TestCase(unittest.TestCase):
to_jid="user%test.com@jcl.test.com", to_jid="user%test.com@jcl.test.com",
subject="message subject", subject="message subject",
body="message body") body="message body")
class MockSMTPAccount(object):
def send_email(self, to_email, subject, body):
self.to_email = to_email
self.subject = subject
self.body = body
accounts = [MockSMTPAccount()] accounts = [MockSMTPAccount()]
result = self.handler.handle(message, Lang.en, accounts) result = self.handler.handle(message, Lang.en, accounts)
self.assertEquals(accounts[0].to_email, "user@test.com") self.assertEquals(accounts[0].email[1], "user@test.com")
self.assertEquals(accounts[0].subject, "message subject") self.assertEquals(accounts[0].email[2], "message subject")
self.assertEquals(accounts[0].body, "message body") self.assertEquals(accounts[0].email[3], "message body")
self.assertEquals(len(result), 1) self.assertEquals(len(result), 1)
self.assertEquals(result[0].stanza_type, "message") self.assertEquals(result[0].stanza_type, "message")
self.assertEquals(result[0].get_from(), "user%test.com@jcl.test.com") self.assertEquals(result[0].get_from(), "user%test.com@jcl.test.com")
@@ -632,16 +637,12 @@ class RootSendMailMessageHandler_TestCase(unittest.TestCase):
subject="message subject", subject="message subject",
body="to: user@test.com\n" \ body="to: user@test.com\n" \
"message body\nother line") "message body\nother line")
class MockSMTPAccount(object):
def send_email(self, to_email, subject, body):
self.to_email = to_email
self.subject = subject
self.body = body
accounts = [MockSMTPAccount()] accounts = [MockSMTPAccount()]
result = self.handler.handle(message, Lang.en, accounts) result = self.handler.handle(message, Lang.en, accounts)
self.assertEquals(accounts[0].to_email, "user@test.com") self.assertEquals(accounts[0].email[1], "user@test.com")
self.assertEquals(accounts[0].subject, "message subject") self.assertEquals(accounts[0].email[2], "message subject")
self.assertEquals(accounts[0].body, "message body\nother line") self.assertEquals(accounts[0].email[3],
"message body\nother line")
self.assertEquals(len(result), 1) self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_type(), None) self.assertEquals(result[0].get_type(), None)
self.assertEquals(result[0].get_from(), "jcl.test.com") self.assertEquals(result[0].get_from(), "jcl.test.com")
@@ -656,11 +657,6 @@ class RootSendMailMessageHandler_TestCase(unittest.TestCase):
to_jid="jcl.test.com", to_jid="jcl.test.com",
subject="message subject", subject="message subject",
body="message body") body="message body")
class MockSMTPAccount(object):
def send_email(self, to_email, subject, body):
self.to_email = to_email
self.subject = subject
self.body = body
accounts = [MockSMTPAccount()] accounts = [MockSMTPAccount()]
result = self.handler.handle(message, Lang.en, accounts) result = self.handler.handle(message, Lang.en, accounts)
self.assertEquals(len(result), 1) self.assertEquals(len(result), 1)

View File

@@ -4,18 +4,18 @@
## Login : <dax@happycoders.org> ## Login : <dax@happycoders.org>
## Started on Fri Jan 19 18:21:44 2007 David Rousselie ## Started on Fri Jan 19 18:21:44 2007 David Rousselie
## $Id$ ## $Id$
## ##
## Copyright (C) 2007 David Rousselie ## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify ## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by ## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or ## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version. ## (at your option) any later version.
## ##
## This program is distributed in the hope that it will be useful, ## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of ## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details. ## GNU General Public License for more details.
## ##
## You should have received a copy of the GNU General Public License ## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software ## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
@@ -24,12 +24,15 @@
import sys import sys
import logging import logging
import email import email
import email.Header from email.Header import Header
from email.MIMEText import MIMEText
from email.MIMEMultipart import MIMEMultipart
import traceback import traceback
import poplib import poplib
import imaplib import imaplib
import socket import socket
import smtplib
from sqlobject.inheritance import InheritableSQLObject from sqlobject.inheritance import InheritableSQLObject
from sqlobject.col import StringCol, IntCol, BoolCol from sqlobject.col import StringCol, IntCol, BoolCol
@@ -80,9 +83,9 @@ class MYPOP3(poplib.POP3):
self.port = port self.port = port
msg = "getaddrinfo returns an empty list" msg = "getaddrinfo returns an empty list"
self.sock = None self.sock = None
for res in socket.getaddrinfo(self.host, for res in socket.getaddrinfo(self.host,
self.port, self.port,
0, 0,
socket.SOCK_STREAM): socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res af, socktype, proto, canonname, sa = res
try: try:
@@ -103,7 +106,7 @@ class MYPOP3(poplib.POP3):
self.welcome = self._getresp() self.welcome = self._getresp()
class MYPOP3_SSL(poplib.POP3_SSL): class MYPOP3_SSL(poplib.POP3_SSL):
def __init__(self, host, port=poplib.POP3_SSL_PORT, keyfile=None, def __init__(self, host, port=poplib.POP3_SSL_PORT, keyfile=None,
certfile=None): certfile=None):
self.host = host self.host = host
self.port = port self.port = port
@@ -112,7 +115,7 @@ class MYPOP3_SSL(poplib.POP3_SSL):
self.buffer = "" self.buffer = ""
msg = "getaddrinfo returns an empty list" msg = "getaddrinfo returns an empty list"
self.sock = None self.sock = None
for res in socket.getaddrinfo(self.host, self.port, 0, for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM): socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res af, socktype, proto, canonname, sa = res
try: try:
@@ -154,11 +157,11 @@ class MailAccount(PresenceAccount):
interval = IntCol(default=5) interval = IntCol(default=5)
store_password = BoolCol(default=True) store_password = BoolCol(default=True)
live_email_only = BoolCol(default=False) live_email_only = BoolCol(default=False)
lastcheck = IntCol(default=0) lastcheck = IntCol(default=0)
waiting_password_reply = BoolCol(default=False) waiting_password_reply = BoolCol(default=False)
first_check = BoolCol(default=True) first_check = BoolCol(default=True)
def _init(self, *args, **kw): def _init(self, *args, **kw):
"""MailAccount init """MailAccount init
Initialize class attributes""" Initialize class attributes"""
@@ -167,7 +170,7 @@ class MailAccount(PresenceAccount):
self.connection = None self.connection = None
self.connected = False self.connected = False
self.default_lang_class = Lang.en self.default_lang_class = Lang.en
def _get_register_fields(cls, real_class=None): def _get_register_fields(cls, real_class=None):
"""See Account._get_register_fields """See Account._get_register_fields
""" """
@@ -204,13 +207,13 @@ class MailAccount(PresenceAccount):
("interval", "text-single", None, ("interval", "text-single", None,
account.int_post_func, account.int_post_func,
lambda bare_from_jid: 5)] lambda bare_from_jid: 5)]
get_register_fields = classmethod(_get_register_fields) get_register_fields = classmethod(_get_register_fields)
def _get_default_port(cls): def _get_default_port(cls):
return 42 return 42
get_default_port = classmethod(_get_default_port) get_default_port = classmethod(_get_default_port)
def _get_presence_actions_fields(cls): def _get_presence_actions_fields(cls):
"""See PresenceAccount._get_presence_actions_fields """See PresenceAccount._get_presence_actions_fields
""" """
@@ -226,7 +229,7 @@ class MailAccount(PresenceAccount):
MailAccount.DIGEST), MailAccount.DIGEST),
'offline_action': (cls.possibles_actions, 'offline_action': (cls.possibles_actions,
PresenceAccount.DO_NOTHING)} PresenceAccount.DO_NOTHING)}
get_presence_actions_fields = classmethod(_get_presence_actions_fields) get_presence_actions_fields = classmethod(_get_presence_actions_fields)
def get_decoded_part(self, part, charset_hint): def get_decoded_part(self, part, charset_hint):
@@ -255,7 +258,7 @@ class MailAccount(PresenceAccount):
(type, value, stack, 5)) (type, value, stack, 5))
return result return result
def format_message(self, email_msg, include_body = True): def format_message(self, email_msg, include_body = True):
from_decoded = email.Header.decode_header(email_msg["From"]) from_decoded = email.Header.decode_header(email_msg["From"])
charset_hint = None charset_hint = None
@@ -312,7 +315,7 @@ class MailAccount(PresenceAccount):
print >>sys.stderr, \ print >>sys.stderr, \
"".join(traceback.format_exception "".join(traceback.format_exception
(type, value, stack, 5)) (type, value, stack, 5))
result += u"\n\n" result += u"\n\n"
if include_body: if include_body:
@@ -329,7 +332,7 @@ class MailAccount(PresenceAccount):
def format_message_summary(self, email_msg): def format_message_summary(self, email_msg):
return self.format_message(email_msg, False) return self.format_message(email_msg, False)
def get_status_msg(self): def get_status_msg(self):
return self.get_type() + "://" + self.login + "@" + self.host + ":" + \ return self.get_type() + "://" + self.login + "@" + self.host + ":" + \
unicode(self.port) unicode(self.port)
@@ -371,13 +374,13 @@ class IMAPAccount(MailAccount):
[("mailbox", "text-single", None, [("mailbox", "text-single", None,
account.default_post_func, account.default_post_func,
lambda bare_from_jid: "INBOX")] lambda bare_from_jid: "INBOX")]
get_register_fields = classmethod(_get_register_fields) get_register_fields = classmethod(_get_register_fields)
def _get_default_port(cls): def _get_default_port(cls):
"""Return default IMAP server port""" """Return default IMAP server port"""
return 143 return 143
get_default_port = classmethod(_get_default_port) get_default_port = classmethod(_get_default_port)
def _init(self, *args, **kw): def _init(self, *args, **kw):
@@ -388,7 +391,7 @@ class IMAPAccount(MailAccount):
if self.ssl: if self.ssl:
return "imaps" return "imaps"
return "imap" return "imap"
def get_status(self): def get_status(self):
return MailAccount.get_status(self) + "/" + self.mailbox return MailAccount.get_status(self) + "/" + self.mailbox
@@ -426,7 +429,7 @@ class IMAPAccount(MailAccount):
return self.format_message(\ return self.format_message(\
email.message_from_string(data[0][1])) email.message_from_string(data[0][1]))
return u"Error while fetching mail " + str(index) return u"Error while fetching mail " + str(index)
def get_mail_summary(self, index): def get_mail_summary(self, index):
self.__logger.debug("Getting mail summary " + str(index)) self.__logger.debug("Getting mail summary " + str(index))
typ, data = self.connection.select(self.mailbox, True) typ, data = self.connection.select(self.mailbox, True)
@@ -444,13 +447,13 @@ class IMAPAccount(MailAccount):
def mark_all_as_read(self): def mark_all_as_read(self):
self.get_mail_list() self.get_mail_list()
type = property(get_type) type = property(get_type)
class POP3Account(MailAccount): class POP3Account(MailAccount):
nb_mail = IntCol(default=0) nb_mail = IntCol(default=0)
lastmail = IntCol(default=0) lastmail = IntCol(default=0)
def _init(self, *args, **kw): def _init(self, *args, **kw):
MailAccount._init(self, *args, **kw) MailAccount._init(self, *args, **kw)
self.__logger = logging.getLogger("jmc.model.account.POP3Account") self.__logger = logging.getLogger("jmc.model.account.POP3Account")
@@ -458,7 +461,7 @@ class POP3Account(MailAccount):
def _get_default_port(cls): def _get_default_port(cls):
"""Return default POP3 server port""" """Return default POP3 server port"""
return 110 return 110
get_default_port = classmethod(_get_default_port) get_default_port = classmethod(_get_default_port)
def get_type(self): def get_type(self):
@@ -470,7 +473,7 @@ class POP3Account(MailAccount):
def connect(self): def connect(self):
self.__logger.debug("Connecting to POP3 server " self.__logger.debug("Connecting to POP3 server "
+ self.login + "@" + self.host + ":" + + self.login + "@" + self.host + ":" +
str(self.port) + ". SSL=" + str(self.ssl)) str(self.port) + ". SSL=" + str(self.ssl))
if self.ssl: if self.ssl:
self.connection = MYPOP3_SSL(self.host, self.port) self.connection = MYPOP3_SSL(self.host, self.port)
@@ -482,7 +485,7 @@ class POP3Account(MailAccount):
self.connection.user(self.login) self.connection.user(self.login)
self.connection.pass_(self.password) self.connection.pass_(self.password)
self.connected = True self.connected = True
def disconnect(self): def disconnect(self):
self.__logger.debug("Disconnecting from POP3 server " self.__logger.debug("Disconnecting from POP3 server "
@@ -493,7 +496,7 @@ class POP3Account(MailAccount):
def get_mail_list(self): def get_mail_list(self):
self.__logger.debug("Getting mail list") self.__logger.debug("Getting mail list")
count, size = self.connection.stat() count, size = self.connection.stat()
self.nb_mail = count self.nb_mail = count
return [str(i) for i in range(1, count + 1)] return [str(i) for i in range(1, count + 1)]
def get_mail(self, index): def get_mail(self, index):
@@ -535,7 +538,7 @@ class POP3Account(MailAccount):
def mark_all_as_read(self): def mark_all_as_read(self):
self.get_mail_list() self.get_mail_list()
self.lastmail = self.nb_mail self.lastmail = self.nb_mail
class SMTPAccount(Account): class SMTPAccount(Account):
"""Send email account""" """Send email account"""
@@ -559,7 +562,7 @@ class SMTPAccount(Account):
def _get_default_port(cls): def _get_default_port(cls):
"""Return default SMTP server port""" """Return default SMTP server port"""
return 25 return 25
get_default_port = classmethod(_get_default_port) get_default_port = classmethod(_get_default_port)
def _get_register_fields(cls, real_class=None): def _get_register_fields(cls, real_class=None):
@@ -628,12 +631,55 @@ class SMTPAccount(Account):
("default_account", "boolean", None, ("default_account", "boolean", None,
default_account_post_func, default_account_post_func,
default_account_default_func)] default_account_default_func)]
get_register_fields = classmethod(_get_register_fields) get_register_fields = classmethod(_get_register_fields)
def send_email(self, to_email, subject, body): def create_email(self, from_email, to_email, subject, body):
"""Create new email"""
email = MIMEText(body)
email['Subject'] = Header(str(subject))
email['From'] = Header(str(from_email))
email['To'] = Header(str(to_email))
return email
def __say_hello(self, connection):
if not (200 <= connection.ehlo()[0] <= 299):
(code, resp) = connection.helo()
if not (200 <= code <= 299):
raise SMTPHeloError(code, resp)
def send_email(self, email):
"""Send email according to current account parameters"""
self.__logger.debug("Sending email:\n" self.__logger.debug("Sending email:\n"
"From: " + self.default_from + "\n" + + str(email))
"To: " + to_email + "\n" + smtp_connection = smtplib.SMTP()
"Subject: " + subject + "\n\n" + if self.__logger.getEffectiveLevel() == logging.DEBUG:
body) smtp_connection.set_debuglevel(1)
smtp_connection.connect(self.host, self.port)
self.__say_hello(smtp_connection)
if self.tls:
smtp_connection.starttls()
self.__say_hello(smtp_connection)
if self.login is not None and len(self.login) > 0:
auth_methods = smtp_connection.esmtp_features["auth"].split()
auth_methods.reverse()
current_error = None
for auth_method in auth_methods:
self.__logger.debug("Trying to authenticate using "
+ auth_method + " method")
smtp_connection.esmtp_features["auth"] = auth_method
try:
smtp_connection.login(self.login, self.password)
current_error = None
self.__logger.debug("Successfuly to authenticate using "
+ auth_method + " method")
break
except smtplib.SMTPAuthenticationError, error:
self.__logger.debug("Failed to authenticate using "
+ auth_method + " method")
current_error = error
if current_error is not None:
raise current_error
smtp_connection.sendmail(str(email['From']), str(email['To']),
email.as_string())
smtp_connection.quit()

View File

@@ -4,18 +4,18 @@
## Login : <dax@happycoders.org> ## Login : <dax@happycoders.org>
## Started on Wed Feb 14 08:23:17 2007 David Rousselie ## Started on Wed Feb 14 08:23:17 2007 David Rousselie
## $Id$ ## $Id$
## ##
## Copyright (C) 2007 David Rousselie ## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify ## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by ## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or ## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version. ## (at your option) any later version.
## ##
## This program is distributed in the hope that it will be useful, ## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of ## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details. ## GNU General Public License for more details.
## ##
## You should have received a copy of the GNU General Public License ## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software ## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
@@ -168,7 +168,7 @@ class POP3Account_TestCase(InheritableAccount_TestCase):
self.pop3_account.ssl = False self.pop3_account.ssl = False
del account.hub.threadConnection del account.hub.threadConnection
self.account_class = POP3Account self.account_class = POP3Account
def tearDown(self): def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url) account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
POP3Account.dropTable(ifExists = True) POP3Account.dropTable(ifExists = True)
@@ -210,7 +210,7 @@ class POP3Account_TestCase(InheritableAccount_TestCase):
return inner return inner
test_connection = make_test() test_connection = make_test()
test_get_mail_list = \ test_get_mail_list = \
make_test(["+OK 2 20\r\n"], \ make_test(["+OK 2 20\r\n"], \
["STAT\r\n"], \ ["STAT\r\n"], \
@@ -340,7 +340,7 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
self.imap_account.disconnect() self.imap_account.disconnect()
self.failUnless(self.server.verify_queries()) self.failUnless(self.server.verify_queries())
return inner return inner
test_connection = make_test() test_connection = make_test()
test_get_mail_list = make_test([lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" +\ test_get_mail_list = make_test([lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" +\
@@ -433,7 +433,7 @@ class SMTPAccount_TestCase(Account_TestCase):
value = post_func("False", None, "user1@test.com") value = post_func("False", None, "user1@test.com")
self.assertTrue(value) self.assertTrue(value)
del account.hub.threadConnection del account.hub.threadConnection
def test_default_account_post_func_true(self): def test_default_account_post_func_true(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url) account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
account11 = SMTPAccount(user_jid="user1@test.com", account11 = SMTPAccount(user_jid="user1@test.com",
@@ -466,6 +466,194 @@ class SMTPAccount_TestCase(Account_TestCase):
self.assertTrue(account12.default_account) self.assertTrue(account12.default_account)
del account.hub.threadConnection del account.hub.threadConnection
def test_create_email(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
del account.hub.threadConnection
email = account11.create_email("from@test.com",
"to@test.com",
"subject",
"body")
self.assertEqual(email['From'], "from@test.com")
self.assertEqual(email['To'], "to@test.com")
self.assertEqual(email['Subject'], "subject")
self.assertEqual(email.get_payload(), "body")
def make_test(self, responses=None, queries=None, core=None):
def inner():
self.server = server.DummyServer("localhost", 1025)
thread.start_new_thread(self.server.serve, ())
self.server.responses = []
if responses:
self.server.responses += responses
self.server.responses += ["221 localhost closing connection\r\n"]
self.server.queries = []
if queries:
self.server.queries += queries
self.server.queries += ["quit\r\n"]
if core:
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
core(self)
del account.hub.threadConnection
self.failUnless(self.server.verify_queries())
return inner
def test_send_email_esmtp_no_auth(self):
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
smtp_account = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
smtp_account.host = "localhost"
smtp_account.port = 1025
del account.hub.threadConnection
email = smtp_account.create_email("from@test.com",
"to@test.com",
"subject",
"body")
test_func = self.make_test(["220 localhost ESMTP\r\n",
"250-localhost Hello 127.0.0.1\r\n"
+ "250-SIZE 52428800\r\n"
+ "250-PIPELINING\r\n"
+ "250 HELP\r\n",
"250 OK\r\n",
"250 Accepted\r\n",
"354 Enter message\r\n",
None, None, None, None,
None, None, None, None,
"250 OK\r\n"],
["ehlo \[127.0.0.1\]\r\n",
"mail FROM:<" + str(email['From']) + ">.*",
"rcpt TO:<" + str(email['To']) + ">\r\n",
"data\r\n"] +
email.as_string().split("\n") + [".\r\n"],
lambda self: \
smtp_account.send_email(email))
test_func()
def test_send_email_no_auth(self):
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
smtp_account = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
smtp_account.host = "localhost"
smtp_account.port = 1025
del account.hub.threadConnection
email = smtp_account.create_email("from@test.com",
"to@test.com",
"subject",
"body")
test_func = self.make_test(["220 localhost SMTP\r\n",
"504 ESMTP not supported\r\n",
"250-localhost Hello 127.0.0.1\r\n"
+ "250-SIZE 52428800\r\n"
+ "250-PIPELINING\r\n"
+ "250 HELP\r\n",
"250 OK\r\n",
"250 Accepted\r\n",
"354 Enter message\r\n",
None, None, None, None,
None, None, None, None,
"250 OK\r\n"],
["ehlo \[127.0.0.1\]\r\n",
"helo \[127.0.0.1\]\r\n",
"mail FROM:<" + str(email['From']) + ">.*",
"rcpt TO:<" + str(email['To']) + ">\r\n",
"data\r\n"] +
email.as_string().split("\n") + [".\r\n"],
lambda self: \
smtp_account.send_email(email))
test_func()
def test_send_email_esmtp_auth(self):
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
smtp_account = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
smtp_account.host = "localhost"
smtp_account.port = 1025
smtp_account.login = "user"
smtp_account.password = "pass"
del account.hub.threadConnection
email = smtp_account.create_email("from@test.com",
"to@test.com",
"subject",
"body")
test_func = self.make_test(["220 localhost ESMTP\r\n",
"250-localhost Hello 127.0.0.1\r\n"
+ "250-SIZE 52428800\r\n"
+ "250-AUTH PLAIN LOGIN CRAM-MD5\r\n"
+ "250-PIPELINING\r\n"
+ "250 HELP\r\n",
"334 ZGF4IDNmNDM2NzY0YzBhNjgyMTQ1MzhhZGNiMjE2YTYxZjRm\r\n",
"235 Authentication succeeded\r\n",
"250 OK\r\n",
"250 Accepted\r\n",
"354 Enter message\r\n",
None, None, None, None,
None, None, None, None,
"250 OK\r\n"],
["ehlo \[127.0.0.1\]\r\n",
"AUTH CRAM-MD5\r\n",
".*\r\n",
"mail FROM:<" + str(email['From']) + ">.*",
"rcpt TO:<" + str(email['To']) + ">\r\n",
"data\r\n"] +
email.as_string().split("\n") + [".\r\n"],
lambda self: \
smtp_account.send_email(email))
test_func()
def test_send_email_esmtp_auth_method2(self):
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
smtp_account = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
smtp_account.host = "localhost"
smtp_account.port = 1025
smtp_account.login = "user"
smtp_account.password = "pass"
del account.hub.threadConnection
email = smtp_account.create_email("from@test.com",
"to@test.com",
"subject",
"body")
test_func = self.make_test(["220 localhost ESMTP\r\n",
"250-localhost Hello 127.0.0.1\r\n"
+ "250-SIZE 52428800\r\n"
+ "250-AUTH PLAIN LOGIN CRAM-MD5\r\n"
+ "250-PIPELINING\r\n"
+ "250 HELP\r\n",
"334 ZGF4IDNmNDM2NzY0YzBhNjgyMTQ1MzhhZGNiMjE2YTYxZjRm\r\n",
"535 Incorrect Authentication data\r\n",
"334 asd235r4\r\n",
"235 Authentication succeeded\r\n",
"250 OK\r\n",
"250 Accepted\r\n",
"354 Enter message\r\n",
None, None, None, None,
None, None, None, None,
"250 OK\r\n"],
["ehlo \[127.0.0.1\]\r\n",
"AUTH CRAM-MD5\r\n",
".*\r\n",
"AUTH LOGIN .*\r\n",
".*\r\n",
"mail FROM:<" + str(email['From']) + ">.*",
"rcpt TO:<" + str(email['To']) + ">\r\n",
"data\r\n"] +
email.as_string().split("\n") + [".\r\n"],
lambda self: \
smtp_account.send_email(email))
test_func()
def suite(): def suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(MailAccount_TestCase, 'test')) suite.addTest(unittest.makeSuite(MailAccount_TestCase, 'test'))

View File

@@ -1,20 +1,20 @@
## ##
## dummy_server.py ## dummy_server.py
## Login : David Rousselie <david.rousselie@happycoders.org> ## Login : David Rousselie <david.rousselie@happycoders.org>
## Started on Fri May 13 12:53:17 2005 ## Started on Fri May 13 12:53:17 2005
## $Id: dummy_server.py,v 1.1 2005/07/11 20:39:31 dax Exp $ ## $Id: dummy_server.py,v 1.1 2005/07/11 20:39:31 dax Exp $
## ##
## Copyright (C) 2005 ## Copyright (C) 2005
## This program is free software; you can redistribute it and/or modify ## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by ## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or ## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version. ## (at your option) any later version.
## ##
## This program is distributed in the hope that it will be useful, ## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of ## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details. ## GNU General Public License for more details.
## ##
## You should have received a copy of the GNU General Public License ## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software ## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
@@ -75,7 +75,7 @@ class DummyServer:
self.responses = None self.responses = None
self.queries = None self.queries = None
self.real_queries = [] self.real_queries = []
def serve(self): def serve(self):
conn = None conn = None
try: try:
@@ -150,7 +150,7 @@ class XMLDummyServer(DummyServer):
r = self._reader.feed(data) r = self._reader.feed(data)
except: except:
type, value, stack = sys.exc_info() type, value, stack = sys.exc_info()
print "".join (traceback.format_exception print "".join (traceback.format_exception
(type, value, stack, 5)) (type, value, stack, 5))
raise raise
# TODO verify got all data </stream> # TODO verify got all data </stream>
@@ -175,7 +175,7 @@ class XMLDummyServer(DummyServer):
self.socket = None self.socket = None
except: except:
type, value, stack = sys.exc_info() type, value, stack = sys.exc_info()
print "".join (traceback.format_exception print "".join (traceback.format_exception
(type, value, stack, 5)) (type, value, stack, 5))
raise raise
@@ -205,5 +205,5 @@ def test():
if __name__ == '__main__': if __name__ == '__main__':
test() test()