Retrieve mail list for IMAP and POP3 accounts

darcs-hash:20080226194326-86b55-75c821b0474733504a86e66ddf6881af1ec9b431.gz
This commit is contained in:
David Rousselie
2008-02-26 20:43:26 +01:00
parent ebaa5bdc82
commit 9f24171943
5 changed files with 340 additions and 235 deletions

View File

@@ -221,7 +221,7 @@ class MailFeeder(Feeder):
self.__logger.debug("\t" + _account.login \
+ "@" + _account.host)
_account.connect()
mail_list = _account.get_mail_list()
mail_list = _account.get_new_mail_list()
default_lang_class = _account.default_lang_class
if action == MailAccount.RETRIEVE:
# TODO : use generator (yield)

View File

@@ -288,7 +288,7 @@ class MailComponent_TestCase(JCLTestCase):
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
account11.get_new_mail_list = lambda: []
result = self.comp.handler.feeder.feed(account11)
self.assertNotEquals(account11.error, None)
self.assertEquals(len(result), 0)
@@ -312,7 +312,7 @@ class MailComponent_TestCase(JCLTestCase):
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
account11.get_new_mail_list = lambda: []
result = self.comp.handler.feeder.feed(account11)
self.assertEquals(account11.error, None)
self.assertEquals(result, [])
@@ -336,7 +336,7 @@ class MailComponent_TestCase(JCLTestCase):
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: [0, 1]
account11.get_new_mail_list = lambda: [0, 1]
account11.get_mail = mock_get_mail
result = self.comp.handler.feeder.feed(account11)
self.assertEquals(account11.error, None)
@@ -370,7 +370,7 @@ class MailComponent_TestCase(JCLTestCase):
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
account11.get_new_mail_list = lambda: []
result = self.comp.handler.feeder.feed(account11)
self.assertEquals(account11.error, None)
self.assertEquals(result, [])
@@ -394,7 +394,7 @@ class MailComponent_TestCase(JCLTestCase):
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: [0, 1]
account11.get_new_mail_list = lambda: [0, 1]
account11.get_mail_summary = mock_get_mail_summary
result = self.comp.handler.feeder.feed(account11)
self.assertEquals(account11.error, None)

View File

@@ -29,6 +29,7 @@ from email.Header import Header
from email.MIMEText import MIMEText
from email.MIMEMultipart import MIMEMultipart
import traceback
import types
import poplib
import imaplib
@@ -238,6 +239,32 @@ class MailAccount(PresenceAccount):
get_presence_actions_fields = classmethod(_get_presence_actions_fields)
def get_decoded_header(self, header, charset_hint=None):
decoded_header = email.Header.decode_header(header)
decoded_header_str = u""
for i in range(len(decoded_header)):
try:
if decoded_header[i][1]:
charset_hint = decoded_header[i][1]
decoded_header_str += unicode(decoded_header[i][0].decode(\
decoded_header[i][1]))
else:
decoded_header_str += unicode(decoded_header[i][0].decode(\
MailAccount.default_encoding))
except Exception,e:
try:
decoded_header_str += unicode(decoded_header[i][0])
except Exception, e:
try:
decoded_header_str += unicode(decoded_header[i][0].decode(\
"iso-8859-1"))
except Exception, e:
type, value, stack = sys.exc_info()
print >>sys.stderr, \
"".join(traceback.format_exception
(type, value, stack, 5))
return (decoded_header_str, charset_hint)
def get_decoded_part(self, part, charset_hint):
content_charset = part.get_content_charset()
result = u""
@@ -265,64 +292,12 @@ class MailAccount(PresenceAccount):
return result
def format_message(self, email_msg, include_body = True):
from_decoded = email.Header.decode_header(email_msg["From"])
charset_hint = None
email_from = u""
result = u"From : "
for i in range(len(from_decoded)):
try:
if from_decoded[i][1]:
charset_hint = from_decoded[i][1]
email_from += unicode(from_decoded[i][0].decode(\
from_decoded[i][1]))
else:
email_from += unicode(from_decoded[i][0].decode(\
MailAccount.default_encoding))
except Exception,e:
try:
email_from += unicode(from_decoded[i][0])
except Exception, e:
try:
email_from += unicode(from_decoded[i][0].decode(\
"iso-8859-1"))
except Exception, e:
type, value, stack = sys.exc_info()
print >>sys.stderr, \
"".join(traceback.format_exception
(type, value, stack, 5))
result += email_from + u"\n"
subject_decoded = email.Header.decode_header(email_msg["Subject"])
result += u"Subject : "
for i in range(len(subject_decoded)):
try:
if subject_decoded[i][1]:
charset_hint = subject_decoded[i][1]
result += unicode(subject_decoded[i][0].decode(\
subject_decoded[i][1]))
else:
result += unicode(subject_decoded[i][0].decode(\
MailAccount.default_encoding))
except Exception,e:
try:
result += unicode(subject_decoded[i][0])
except Exception, e:
try:
result += unicode(subject_decoded[i][0].decode(\
"iso-8859-1"))
except Exception, e:
if charset_hint is not None:
try:
result += unicode(subject_decoded[i][0].decode(\
charset_hint))
except Exception, e:
type, value, stack = sys.exc_info()
print >>sys.stderr, \
"".join(traceback.format_exception
(type, value, stack, 5))
result += u"\n\n"
def format_message(self, email_msg, include_body=True):
(email_from, charset_hint) = self.get_decoded_header(email_msg["From"])
result = u"From : " + email_from + "\n"
(email_subject, charset_hint) = self.get_decoded_header(email_msg["Subject"],
charset_hint)
result += u"Subject : " + email_subject + "\n\n"
if include_body:
action = {
@@ -340,7 +315,7 @@ class MailAccount(PresenceAccount):
return self.format_message(email_msg, False)
def get_default_status_msg(self, lang_class):
return self.get_type() + "://" + self.login + "@" + self.host + ":" + \
return self.get_type() + "://" + self.login + "@" + self.host + ":" + \
unicode(self.port)
def connect(self):
@@ -349,7 +324,10 @@ class MailAccount(PresenceAccount):
def disconnect(self):
raise NotImplementedError
def get_mail_list(self):
def get_mail_list_summary(self, start_index=0, end_index=20):
raise NotImplementedError
def get_new_mail_list(self):
raise NotImplementedError
def get_mail(self, index):
@@ -394,15 +372,15 @@ class IMAPAccount(MailAccount):
get_default_port = classmethod(_get_default_port)
def _init(self, *args, **kw):
MailAccount._init(self, *args, **kw)
MailAccount._init(self, *args, **kw)
self.__logger = logging.getLogger("jmc.IMAPConnection")
self._regexp_list = re.compile("\((.*)\) \"(.)\" \"?([^\"]*)\"?$")
self.__cached_folders = {}
def get_type(self):
if self.ssl:
return "imaps"
return "imap"
if self.ssl:
return "imaps"
return "imap"
def _get_real_mailbox(self):
"""
@@ -412,7 +390,7 @@ class IMAPAccount(MailAccount):
return self.mailbox.replace("/", self.delimiter)
def get_status(self):
return MailAccount.get_status(self) + "/" + self.mailbox
return MailAccount.get_status(self) + "/" + self.mailbox
def connect(self):
self.__logger.debug("Connecting to IMAP server "
@@ -427,36 +405,53 @@ class IMAPAccount(MailAccount):
self.connected = True
def disconnect(self):
self.__logger.debug("Disconnecting from IMAP server "
self.__logger.debug("Disconnecting from IMAP server "
+ self.host)
self.connection.logout()
self.connection.logout()
self.connected = False
def get_mail_list(self):
self.__logger.debug("Getting mail list")
typ, data = self.connection.select(self._get_real_mailbox())
typ, data = self.connection.search(None, 'RECENT')
if typ == 'OK':
return data[0].split(' ')
return None
def get_mail_list_summary(self, start_index=1, end_index=20):
self.__logger.debug("Getting mail list summary")
typ, count = self.connection.select(self._get_real_mailbox(), True)
result = []
if typ == "OK":
typ, data = self.connection.fetch(str(start_index) + ":" +
str(end_index),
"RFC822.header")
if typ == 'OK':
index = start_index
for _email in data:
if isinstance(_email, types.TupleType) and len(_email) == 2:
subject_header = self.get_decoded_header(email.message_from_string(_email[1])["Subject"])[0]
result.append((str(index), subject_header))
index += 1
return result
def get_new_mail_list(self):
self.__logger.debug("Getting mail list")
typ, data = self.connection.select(self._get_real_mailbox(), True)
typ, data = self.connection.search(None, 'RECENT')
if typ == 'OK':
return data[0].split(' ')
return None
def get_mail(self, index):
self.__logger.debug("Getting mail " + str(index))
typ, data = self.connection.select(self.mailbox, True)
typ, data = self.connection.fetch(index, '(RFC822)')
if typ == 'OK':
self.__logger.debug("Getting mail " + str(index))
typ, data = self.connection.select(self._get_real_mailbox(), True)
typ, data = self.connection.fetch(index, '(RFC822)')
if typ == 'OK':
return self.format_message(\
email.message_from_string(data[0][1]))
return u"Error while fetching mail " + str(index)
return u"Error while fetching mail " + str(index)
def get_mail_summary(self, index):
self.__logger.debug("Getting mail summary " + str(index))
typ, data = self.connection.select(self.mailbox, True)
typ, data = self.connection.fetch(index, '(RFC822)')
if typ == 'OK':
self.__logger.debug("Getting mail summary " + str(index))
typ, data = self.connection.select(self._get_real_mailbox(), True)
typ, data = self.connection.fetch(index, '(RFC822.header)')
if typ == 'OK':
return self.format_message_summary(\
email.message_from_string(data[0][1]))
return u"Error while fetching mail " + str(index)
return u"Error while fetching mail " + str(index)
def get_next_mail_index(self, mail_list):
if self.is_mail_list_valid(mail_list):
@@ -465,7 +460,7 @@ class IMAPAccount(MailAccount):
return None
def mark_all_as_read(self):
self.get_mail_list()
self.get_new_mail_list()
type = property(get_type)
@@ -525,7 +520,8 @@ class IMAPAccount(MailAccount):
self.delimiter = match.group(2)
else:
self.disconnect()
raise Exception("Cannot find mailbox " + self.mailbox)
raise Exception("Cannot find delimiter for mailbox "
+ self.mailbox)
else:
self.disconnect()
raise Exception("Cannot find mailbox " + self.mailbox)
@@ -578,9 +574,24 @@ class POP3Account(MailAccount):
self.connection.quit()
self.connected = False
def get_mail_list(self):
def get_mail_list_summary(self, start_index=0, end_index=20):
self.__logger.debug("Getting mail list")
count, size = self.connection.stat()
result = []
for index in xrange(1, count + 1):
(ret, data, octets) = self.connection.top(index, 0)
if ret[0:3] == '+OK':
subject_header = self.get_decoded_header(email.message_from_string('\n'.join(data))["Subject"])[0]
result.append((str(index), subject_header))
try:
self.connection.rset()
except:
pass
return result
def get_new_mail_list(self):
self.__logger.debug("Getting new mail list")
count, size = self.connection.stat()
self.nb_mail = count
return [str(i) for i in range(1, count + 1)]
@@ -621,7 +632,7 @@ class POP3Account(MailAccount):
return None
def mark_all_as_read(self):
self.get_mail_list()
self.get_new_mail_list()
self.lastmail = self.nb_mail
@@ -719,16 +730,16 @@ class SMTPAccount(Account):
def create_email(self, from_email, to_email, subject, body, other_headers=None):
"""Create new email"""
email = MIMEText(body)
_email = MIMEText(body)
if subject is None:
subject = ""
email['Subject'] = Header(str(subject))
email['From'] = Header(str(from_email))
email['To'] = Header(str(to_email))
_email['Subject'] = Header(str(subject))
_email['From'] = Header(str(from_email))
_email['To'] = Header(str(to_email))
if other_headers is not None:
for header_name in other_headers.keys():
email[header_name] = Header(other_headers[header_name])
return email
_email[header_name] = Header(other_headers[header_name])
return _email
def __say_hello(self, connection):
if not (200 <= connection.ehlo()[0] <= 299):
@@ -736,10 +747,10 @@ class SMTPAccount(Account):
if not (200 <= code <= 299):
raise SMTPHeloError(code, resp)
def send_email(self, email):
def send_email(self, _email):
"""Send email according to current account parameters"""
self.__logger.debug("Sending email:\n"
+ str(email))
+ str(_email))
smtp_connection = smtplib.SMTP()
if self.__logger.getEffectiveLevel() == logging.DEBUG:
smtp_connection.set_debuglevel(1)
@@ -768,6 +779,6 @@ class SMTPAccount(Account):
current_error = error
if current_error is not None:
raise current_error
smtp_connection.sendmail(str(email['From']), str(email['To']),
email.as_string())
smtp_connection.sendmail(str(_email['From']), str(_email['To']),
_email.as_string())
smtp_connection.quit()

View File

@@ -54,70 +54,78 @@ class MailAccount_TestCase(PresenceAccount_TestCase):
test_get_decoded_part_not_encoded = \
make_test((False, False, False), \
lambda self, email: self.account.get_decoded_part(email, None), \
lambda self, email: \
self.account.get_decoded_part(email, None),
u"Not encoded single part")
test_get_decoded_part_encoded = \
make_test((True, False, False), \
lambda self, email: self.account.get_decoded_part(email, None), \
make_test((True, False, False),
lambda self, email: \
self.account.get_decoded_part(email, None),
u"Encoded single part with 'iso-8859-15' charset (éàê)")
test_format_message_summary_not_encoded = \
make_test((False, False, True), \
lambda self, email: self.account.format_message_summary(email), \
(u"From : not encoded from\nSubject : not encoded subject\n\n", \
make_test((False, False, True),
lambda self, email: \
self.account.format_message_summary(email),
(u"From : not encoded from\nSubject : not encoded subject\n\n",
u"not encoded from"))
test_format_message_summary_encoded = \
make_test((True, False, True), \
lambda self, email: self.account.format_message_summary(email), \
make_test((True, False, True),
lambda self, email: \
self.account.format_message_summary(email),
(u"From : encoded from (éàê)\nSubject : encoded subject " + \
u"(éàê)\n\n", \
u"(éàê)\n\n",
u"encoded from (éàê)"))
test_format_message_summary_partial_encoded = \
make_test((True, False, True), \
make_test((True, False, True),
lambda self, email: \
email.replace_header("Subject", \
"\" " + str(email["Subject"]) \
+ " \" not encoded part") or \
email.replace_header("From", \
"\" " + str(email["From"]) \
+ " \" not encoded part") or \
self.account.format_message_summary(email), \
email.replace_header("Subject",
"\" " + str(email["Subject"]) \
+ " \" not encoded part") or \
email.replace_header("From",
"\" " + str(email["From"]) \
+ " \" not encoded part") or \
self.account.format_message_summary(email),
(u"From : \"encoded from (éàê)\" not encoded part\nSubject " + \
u": \"encoded subject (éàê)\" not encoded part\n\n", \
u": \"encoded subject (éàê)\" not encoded part\n\n",
u"\"encoded from (éàê)\" not encoded part"))
test_format_message_single_not_encoded = \
make_test((False, False, True), \
lambda self, email: self.account.format_message(email), \
make_test((False, False, True),
lambda self, email: \
self.account.format_message(email),
(u"From : not encoded from\nSubject : not encoded subject" + \
u"\n\nNot encoded single part\n", \
u"\n\nNot encoded single part\n",
u"not encoded from"))
test_format_message_single_encoded = \
make_test((True, False, True), \
lambda self, email: self.account.format_message(email), \
make_test((True, False, True),
lambda self, email: \
self.account.format_message(email),
(u"From : encoded from (éàê)\nSubject : encoded subject " + \
u"(éàê)\n\nEncoded single part with 'iso-8859-15' charset" + \
u" (éàê)\n", \
u"(éàê)\n\nEncoded single part with 'iso-8859-15' charset" + \
u" (éàê)\n",
u"encoded from (éàê)"))
test_format_message_multi_not_encoded = \
make_test((False, True, True), \
lambda self, email: self.account.format_message(email), \
make_test((False, True, True),
lambda self, email: \
self.account.format_message(email),
(u"From : not encoded from\nSubject : not encoded subject" + \
u"\n\nNot encoded multipart1\nNot encoded multipart2\n", \
u"\n\nNot encoded multipart1\nNot encoded multipart2\n",
u"not encoded from"))
test_format_message_multi_encoded = \
make_test((True, True, True), \
lambda self, email: self.account.format_message(email), \
make_test((True, True, True),
lambda self, email: \
self.account.format_message(email),
(u"From : encoded from (éàê)\nSubject : encoded subject (éà" + \
u"ê)\n\nutf-8 multipart1 with no charset (éàê)" + \
u"\nEncoded multipart2 with 'iso-8859-15' charset (éàê)\n" + \
u"Encoded multipart3 with no charset (éàê)\n", \
u"ê)\n\nutf-8 multipart1 with no charset (éàê)" + \
u"\nEncoded multipart2 with 'iso-8859-15' charset (éàê)\n" + \
u"Encoded multipart3 with no charset (éàê)\n",
u"encoded from (éàê)"))
class POP3Account_TestCase(InheritableAccount_TestCase):
@@ -161,72 +169,90 @@ class POP3Account_TestCase(InheritableAccount_TestCase):
"Sended queries does not match expected queries.")
return inner
test_connection = make_test()
test_connection = make_test
test_get_mail_list = \
test_get_mail_list_summary = \
make_test(["+OK 2 20\r\n",
"+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: mail subject 1\r\n.\r\n",
"+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: mail subject 2\r\n.\r\n",
"+OK\r\n"],
["STAT\r\n",
"TOP 1 0\r\n",
"TOP 2 0\r\n",
"RSET\r\n"],
lambda self: \
self.assertEquals(self.pop3_account.get_mail_list_summary(),
[("1", "mail subject 1"),
("2", "mail subject 2")]))
test_get_new_mail_list = \
make_test(["+OK 2 20\r\n"],
["STAT\r\n"],
lambda self: \
self.assertEquals(self.pop3_account.get_mail_list(),
self.assertEquals(self.pop3_account.get_new_mail_list(),
["1", "2"]))
test_get_mail_summary = \
make_test(["+OK 10 octets\r\n" +
"From: user@test.com\r\n" +
"Subject: subject test\r\n\r\n" +
"mymessage\r\n.\r\n",
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"+OK\r\n"],
["RETR 1\r\n",
"RSET\r\n"],
lambda self: \
self.assertEquals(self.pop3_account.get_mail_summary(1),
(u"From : user@test.com\n" +
u"Subject : subject test\n\n",
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n",
u"user@test.com")))
test_get_mail = \
make_test(["+OK 10 octets\r\n" +
"From: user@test.com\r\n" +
"Subject: subject test\r\n\r\n" +
"mymessage\r\n.\r\n",
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"+OK\r\n"],
["RETR 1\r\n",
"RSET\r\n"], \
"RSET\r\n"],
lambda self: \
self.assertEquals(self.pop3_account.get_mail(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n" + \
u"mymessage\n", \
u"user@test.com")))
self.assertEquals(self.pop3_account.get_mail(1),
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n" + \
u"mymessage\n",
u"user@test.com")))
test_unsupported_reset_command_get_mail_summary = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"-ERR unknown command\r\n"], \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"-ERR unknown command\r\n"],
["RETR 1\r\n",
"RSET\r\n"], \
"RSET\r\n"],
lambda self: \
self.assertEquals(self.pop3_account.get_mail_summary(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n", \
u"user@test.com")))
self.assertEquals(self.pop3_account.get_mail_summary(1),
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n",
u"user@test.com")))
test_unsupported_reset_command_get_mail = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"-ERR unknown command\r\n"], \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"-ERR unknown command\r\n"],
["RETR 1\r\n",
"RSET\r\n"], \
"RSET\r\n"],
lambda self: \
self.assertEquals(self.pop3_account.get_mail(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n" + \
u"mymessage\n", \
u"user@test.com")))
self.assertEquals(self.pop3_account.get_mail(1),
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n" + \
u"mymessage\n",
u"user@test.com")))
class IMAPAccount_TestCase(InheritableAccount_TestCase):
def setUp(self):
@@ -278,21 +304,43 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
test_func = self.make_test()
test_func()
def test_get_mail_list(self):
test_func = self.make_test([lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" +\
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\n", \
lambda data: "* SEARCH 9 10 \n" + \
data.split()[0] + " OK SEARCH completed\n"], \
["^[^ ]* SELECT INBOX", \
"^[^ ]* SEARCH RECENT"], \
lambda self: \
self.assertEquals(self.imap_account.get_mail_list(), ['9', '10']))
def test_get_mail_list_summary(self):
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n",
lambda data: "* 1 FETCH ((RFC822.header) {38}\r\n" + \
"Subject: mail subject 1\r\n\r\nbody text\r\n)\r\n" + \
"* 2 FETCH ((RFC822.header) {38}\r\n" + \
"Subject: mail subject 2\r\n\r\nbody text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"],
["^[^ ]* EXAMINE INBOX",
"^[^ ]* FETCH 1:20 RFC822.header"],
lambda self: \
self.assertEquals(self.imap_account.get_mail_list_summary(),
[('1', 'mail subject 1'),
('2', 'mail subject 2')]))
test_func()
def test_get_mail_list_delimiter1(self):
def test_get_new_mail_list(self):
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" + \
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" + \
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\n",
lambda data: "* SEARCH 9 10 \n" + \
data.split()[0] + " OK SEARCH completed\n"],
["^[^ ]* EXAMINE INBOX",
"^[^ ]* SEARCH RECENT"],
lambda self: \
self.assertEquals(self.imap_account.get_new_mail_list(),
['9', '10']))
test_func()
def test_get_new_mail_list_delimiter1(self):
self.imap_account.mailbox = "INBOX/dir1/subdir2"
self.imap_account.delimiter = "."
test_func = self.make_test( \
@@ -300,16 +348,17 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" + \
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\n", \
lambda data: "* SEARCH 9 10 \n" + \
data.split()[0] + " OK SEARCH completed\n"], \
["^[^ ]* SELECT \"?INBOX\.dir1\.subdir2\"?",
"^[^ ]* SEARCH RECENT"], \
lambda self: \
self.assertEquals(self.imap_account.get_mail_list(), ['9', '10']))
" OK [READ-WRITE] SELECT completed\n",
lambda data: "* SEARCH 9 10 \n" + \
data.split()[0] + " OK SEARCH completed\n"],
["^[^ ]* EXAMINE \"?INBOX\.dir1\.subdir2\"?",
"^[^ ]* SEARCH RECENT"],
lambda self: \
self.assertEquals(self.imap_account.get_new_mail_list(),
['9', '10']))
test_func()
def test_get_mail_list_delimiter2(self):
def test_get_new_mail_list_delimiter2(self):
self.imap_account.mailbox = "INBOX/dir1/subdir2"
self.imap_account.delimiter = "/"
test_func = self.make_test( \
@@ -317,45 +366,90 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" + \
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\n", \
lambda data: "* SEARCH 9 10 \n" + \
data.split()[0] + " OK SEARCH completed\n"], \
["^[^ ]* SELECT \"?INBOX/dir1/subdir2\"?",
"^[^ ]* SEARCH RECENT"], \
lambda self: \
self.assertEquals(self.imap_account.get_mail_list(), ['9', '10']))
" OK [READ-WRITE] SELECT completed\n",
lambda data: "* SEARCH 9 10 \n" + \
data.split()[0] + " OK SEARCH completed\n"],
["^[^ ]* EXAMINE \"?INBOX/dir1/subdir2\"?",
"^[^ ]* SEARCH RECENT"],
lambda self: \
self.assertEquals(self.imap_account.get_new_mail_list(),
['9', '10']))
test_func()
def test_get_mail_summary(self):
test_func = self.make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n", \
lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"],
["^[^ ]* EXAMINE INBOX",
"^[^ ]* FETCH 1 \(RFC822\)"],
lambda self: self.assertEquals(self.imap_account.get_mail_summary(1),
(u"From : None\nSubject : None\n\n",
u"None")))
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n",
lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"],
["^[^ ]* EXAMINE INBOX",
"^[^ ]* FETCH 1 \(RFC822.header\)"],
lambda self: \
self.assertEquals(self.imap_account.get_mail_summary(1),
(u"From : None\nSubject : None\n\n",
u"None")))
test_func()
def test_get_mail_summary_delimiter(self):
self.imap_account.mailbox = "INBOX/dir1/subdir2"
self.imap_account.delimiter = "."
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n",
lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"],
["^[^ ]* EXAMINE \"?INBOX\.dir1\.subdir2\"?",
"^[^ ]* FETCH 1 \(RFC822.header\)"],
lambda self: \
self.assertEquals(self.imap_account.get_mail_summary(1),
(u"From : None\nSubject : None\n\n",
u"None")))
test_func()
def test_get_mail(self):
test_func = self.make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n", \
lambda data: "* 1 FETCH ((RFC822) {11}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"], \
["^[^ ]* EXAMINE INBOX", \
"^[^ ]* FETCH 1 \(RFC822\)",], \
lambda self: self.assertEquals(self.imap_account.get_mail(1), \
(u"From : None\nSubject : None\n\nbody text\r\n\n", \
u"None")))
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" + \
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" + \
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n",
lambda data: "* 1 FETCH ((RFC822) {11}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"],
["^[^ ]* EXAMINE INBOX",
"^[^ ]* FETCH 1 \(RFC822\)"],
lambda self: \
self.assertEquals(self.imap_account.get_mail(1),
(u"From : None\nSubject : None\n\nbody text\r\n\n",
u"None")))
test_func()
def test_get_mail_delimiter(self):
self.imap_account.mailbox = "INBOX/dir1/subdir2"
self.imap_account.delimiter = "."
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" + \
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" + \
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n",
lambda data: "* 1 FETCH ((RFC822) {11}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"],
["^[^ ]* EXAMINE \"?INBOX\.dir1\.subdir2\"?",
"^[^ ]* FETCH 1 \(RFC822\)"],
lambda self: \
self.assertEquals(self.imap_account.get_mail(1),
(u"From : None\nSubject : None\n\nbody text\r\n\n",
u"None")))
test_func()
def test_build_folder_cache(self):
@@ -576,7 +670,7 @@ class SMTPAccount_TestCase(Account_TestCase):
None, None, None, None,
None, None, None, None,
"250 OK\r\n"],
["ehlo \[127.0.0.1\]\r\n",
["ehlo \[127.0...1\]\r\n",
"mail FROM:<" + str(email['From']) + ">.*",
"rcpt TO:<" + str(email['To']) + ">\r\n",
"data\r\n"] +
@@ -609,8 +703,8 @@ class SMTPAccount_TestCase(Account_TestCase):
None, None, None, None,
None, None, None, None,
"250 OK\r\n"],
["ehlo \[127.0.0.1\]\r\n",
"helo \[127.0.0.1\]\r\n",
["ehlo \[127.0...1\]\r\n",
"helo \[127.0...1\]\r\n",
"mail FROM:<" + str(email['From']) + ">.*",
"rcpt TO:<" + str(email['To']) + ">\r\n",
"data\r\n"] +
@@ -647,7 +741,7 @@ class SMTPAccount_TestCase(Account_TestCase):
None, None, None, None,
None, None, None, None,
"250 OK\r\n"],
["ehlo \[127.0.0.1\]\r\n",
["ehlo \[127.0...1\]\r\n",
"AUTH CRAM-MD5\r\n",
".*\r\n",
"mail FROM:<" + str(email['From']) + ">.*",
@@ -688,7 +782,7 @@ class SMTPAccount_TestCase(Account_TestCase):
None, None, None, None,
None, None, None, None,
"250 OK\r\n"],
["ehlo \[127.0.0.1\]\r\n",
["ehlo \[127.0...1\]\r\n",
"AUTH CRAM-MD5\r\n",
".*\r\n",
"AUTH LOGIN .*\r\n",