Raise exception when an IMAP error occurs

darcs-hash:20080505195658-86b55-43e77ad09f160837cadc7b49dd0b88664f7cdaaf.gz
This commit is contained in:
David Rousselie
2008-05-05 21:56:58 +02:00
parent 3616e35abd
commit 9f52f9ed7e
3 changed files with 80 additions and 20 deletions

View File

@@ -425,20 +425,21 @@ class IMAPAccount(MailAccount):
of tuple (email_index, email_subject) of tuple (email_index, email_subject)
""" """
self.__logger.debug("Getting mail list summary") self.__logger.debug("Getting mail list summary")
typ, count = self.connection.select(self._get_real_mailbox(), True) typ, data = self.connection.select(self._get_real_mailbox(), True)
result = []
if typ == "OK": if typ == "OK":
typ, data = self.connection.fetch(str(start_index) + ":" + typ, data = self.connection.fetch(str(start_index) + ":" +
str(end_index), str(end_index),
"RFC822.header") "RFC822.header")
if typ == 'OK': if typ == 'OK':
result = []
index = start_index index = start_index
for _email in data: for _email in data:
if isinstance(_email, types.TupleType) and len(_email) == 2: if isinstance(_email, types.TupleType) and len(_email) == 2:
subject_header = self.get_decoded_header(email.message_from_string(_email[1])["Subject"])[0] subject_header = self.get_decoded_header(email.message_from_string(_email[1])["Subject"])[0]
result.append((str(index), subject_header)) result.append((str(index), subject_header))
index += 1 index += 1
return result return result
raise Exception(data[0])
def get_new_mail_list(self): def get_new_mail_list(self):
""" """
@@ -446,28 +447,31 @@ class IMAPAccount(MailAccount):
""" """
self.__logger.debug("Getting mail list") self.__logger.debug("Getting mail list")
typ, data = self.connection.select(self._get_real_mailbox()) typ, data = self.connection.select(self._get_real_mailbox())
typ, data = self.connection.search(None, 'RECENT')
if typ == 'OK': if typ == 'OK':
return data[0].split(' ') typ, data = self.connection.search(None, 'RECENT')
return None if typ == 'OK':
return data[0].split(' ')
raise Exception(data[0])
def get_mail(self, index): def get_mail(self, index):
self.__logger.debug("Getting mail " + str(index)) self.__logger.debug("Getting mail " + str(index))
typ, data = self.connection.select(self._get_real_mailbox(), True) typ, data = self.connection.select(self._get_real_mailbox(), True)
typ, data = self.connection.fetch(index, '(RFC822)')
if typ == 'OK': if typ == 'OK':
return self.format_message(\ typ, data = self.connection.fetch(index, '(RFC822)')
email.message_from_string(data[0][1])) if typ == 'OK':
return u"Error while fetching mail " + str(index) return self.format_message(\
email.message_from_string(data[0][1]))
raise Exception(data[0] + " (email " + str(index) + ")")
def get_mail_summary(self, index): def get_mail_summary(self, index):
self.__logger.debug("Getting mail summary " + str(index)) self.__logger.debug("Getting mail summary " + str(index))
typ, data = self.connection.select(self._get_real_mailbox(), True) typ, data = self.connection.select(self._get_real_mailbox(), True)
typ, data = self.connection.fetch(index, '(RFC822.header)')
if typ == 'OK': if typ == 'OK':
return self.format_message_summary(\ typ, data = self.connection.fetch(index, '(RFC822.header)')
email.message_from_string(data[0][1])) if typ == 'OK':
return u"Error while fetching mail " + str(index) return self.format_message_summary(\
email.message_from_string(data[0][1]))
raise Exception(data[0] + " (email " + str(index) + ")")
def get_next_mail_index(self, mail_list): def get_next_mail_index(self, mail_list):
""" """

View File

@@ -431,6 +431,10 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
('2', 'mail subject 2')])) ('2', 'mail subject 2')]))
test_func() test_func()
def test_get_mail_list_summary_inbox_does_not_exist(self):
self.__test_select_inbox_does_not_exist(\
lambda: self.imap_account.get_mail_list_summary(), readonly=True)
def test_get_mail_list_summary_start_index(self): def test_get_mail_list_summary_start_index(self):
test_func = self.make_test(\ test_func = self.make_test(\
[lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\ [lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
@@ -487,6 +491,30 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
['9', '10'])) ['9', '10']))
test_func() test_func()
def __test_select_inbox_does_not_exist(self, tested_func,
exception_message="Mailbox does not exist",
readonly=False):
def check_func(self):
try:
tested_func()
except Exception, e:
self.assertEquals(e.message, exception_message)
return
self.fail("No exception raised when selecting non existing mailbox")
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" + \
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" + \
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
data.split()[0] + \
" NO Mailbox does not exist \n"],
["^[^ ]* " + (readonly and "EXAMINE" or "SELECT") + " INBOX"],
check_func)
test_func()
def test_get_new_mail_list_inbox_does_not_exist(self):
self.__test_select_inbox_does_not_exist(\
lambda: self.imap_account_get_new_mail_list())
def test_get_new_mail_list_delimiter1(self): def test_get_new_mail_list_delimiter1(self):
self.imap_account.mailbox = "INBOX/dir1/subdir2" self.imap_account.mailbox = "INBOX/dir1/subdir2"
self.imap_account.delimiter = "." self.imap_account.delimiter = "."
@@ -541,6 +569,29 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
u"None"))) u"None")))
test_func() test_func()
def test_get_mail_summary_inbox_does_not_exist(self):
self.__test_select_inbox_does_not_exist(\
lambda: self.imap_account.get_mail_summary(1),
"Mailbox does not exist (email 1)", True)
def test_get_new_mail_list_inbox_does_not_exist(self):
def check_func(self):
try:
self.imap_account.get_new_mail_list()
except Exception, e:
self.assertEquals(e.message, "Mailbox does not exist")
return
self.fail("No exception raised when selecting non existing mailbox")
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" + \
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" + \
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
data.split()[0] + \
" NO Mailbox does not exist \n"],
["^[^ ]* SELECT INBOX"],
check_func)
test_func()
def test_get_mail_summary_delimiter(self): def test_get_mail_summary_delimiter(self):
self.imap_account.mailbox = "INBOX/dir1/subdir2" self.imap_account.mailbox = "INBOX/dir1/subdir2"
self.imap_account.delimiter = "." self.imap_account.delimiter = "."
@@ -579,6 +630,11 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
u"None"))) u"None")))
test_func() test_func()
def test_get_mail_inbox_does_not_exist(self):
self.__test_select_inbox_does_not_exist(\
lambda: self.imap_account.get_mail(1),
"Mailbox does not exist (email 1)", True)
def test_get_mail_delimiter(self): def test_get_mail_delimiter(self):
self.imap_account.mailbox = "INBOX/dir1/subdir2" self.imap_account.mailbox = "INBOX/dir1/subdir2"
self.imap_account.delimiter = "." self.imap_account.delimiter = "."