- use RECENT flag for search on IMAP server with write access so RECENT flag is deleted on new messages and fetch read only so SEEN flag is not set on fetched messages. - It might work on Exchange 2003 that do not support STORE flag UNSEEN - This change the get_mail loop so now each MailConnection type implement get_next_mail_index because logic is different from IMAP to POP3 connection. darcs-hash:20060207213333-86b55-9c2ccf31fb9ae9e6dd454c907f3188bc6080b817.gz
271 lines
13 KiB
Python
271 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
##
|
|
## mailconnection_test.py
|
|
## Login : David Rousselie <david.rousselie@happycoders.org>
|
|
## Started on Fri May 13 11:32:51 2005 David Rousselie
|
|
## $Id: test_mailconnection.py,v 1.2 2005/09/18 20:24:07 David Rousselie Exp $
|
|
##
|
|
## Copyright (C) 2005 David Rousselie
|
|
## This program is free software; you can redistribute it and/or modify
|
|
## it under the terms of the GNU General Public License as published by
|
|
## the Free Software Foundation; either version 2 of the License, or
|
|
## (at your option) any later version.
|
|
##
|
|
## This program is distributed in the hope that it will be useful,
|
|
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
## GNU General Public License for more details.
|
|
##
|
|
## You should have received a copy of the GNU General Public License
|
|
## along with this program; if not, write to the Free Software
|
|
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
##
|
|
|
|
import unittest
|
|
from jabber.mailconnection import IMAPConnection, \
|
|
POP3Connection, \
|
|
MailConnection
|
|
import dummy_server
|
|
import email_generator
|
|
import thread
|
|
import re
|
|
import sys
|
|
import string
|
|
|
|
class MailConnection_TestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.connection = MailConnection()
|
|
|
|
def make_test(email_type, tested_func, expected_res):
|
|
def inner(self):
|
|
encoded, multipart, header = email_type
|
|
email = email_generator.generate(encoded, \
|
|
multipart, \
|
|
header)
|
|
part = tested_func(self, email)
|
|
self.assertEquals(part, expected_res)
|
|
return inner
|
|
|
|
test_get_decoded_part_not_encoded = \
|
|
make_test((False, False, False), \
|
|
lambda self, email: self.connection.get_decoded_part(email, None), \
|
|
u"Not encoded single part")
|
|
|
|
test_get_decoded_part_encoded = \
|
|
make_test((True, False, False), \
|
|
lambda self, email: self.connection.get_decoded_part(email, None), \
|
|
u"Encoded single part with 'iso-8859-15' charset (éàê)")
|
|
|
|
test_format_message_summary_not_encoded = \
|
|
make_test((False, False, True), \
|
|
lambda self, email: self.connection.format_message_summary(email), \
|
|
(u"From : not encoded from\nSubject : not encoded subject\n\n", \
|
|
u"not encoded from"))
|
|
|
|
test_format_message_summary_encoded = \
|
|
make_test((True, False, True), \
|
|
lambda self, email: self.connection.format_message_summary(email), \
|
|
(u"From : encoded from (éàê)\nSubject : encoded subject " + \
|
|
u"(éàê)\n\n", \
|
|
u"encoded from (éàê)"))
|
|
|
|
test_format_message_summary_partial_encoded = \
|
|
make_test((True, False, True), \
|
|
lambda self, email: \
|
|
email.replace_header("Subject", \
|
|
"\"" + str(email["Subject"]) \
|
|
+ "\" not encoded part") or \
|
|
email.replace_header("From", \
|
|
"\"" + str(email["From"]) \
|
|
+ "\" not encoded part") or \
|
|
self.connection.format_message_summary(email), \
|
|
(u"From : \"encoded from (éàê)\" not encoded part\nSubject " + \
|
|
u": \"encoded subject (éàê)\" not encoded part\n\n", \
|
|
u"\"encoded from (éàê)\" not encoded part"))
|
|
|
|
test_format_message_single_not_encoded = \
|
|
make_test((False, False, True), \
|
|
lambda self, email: self.connection.format_message(email), \
|
|
(u"From : not encoded from\nSubject : not encoded subject" + \
|
|
u"\n\nNot encoded single part\n", \
|
|
u"not encoded from"))
|
|
|
|
test_format_message_single_encoded = \
|
|
make_test((True, False, True), \
|
|
lambda self, email: self.connection.format_message(email), \
|
|
(u"From : encoded from (éàê)\nSubject : encoded subject " + \
|
|
u"(éàê)\n\nEncoded single part with 'iso-8859-15' charset" + \
|
|
u" (éàê)\n", \
|
|
u"encoded from (éàê)"))
|
|
|
|
test_format_message_multi_not_encoded = \
|
|
make_test((False, True, True), \
|
|
lambda self, email: self.connection.format_message(email), \
|
|
(u"From : not encoded from\nSubject : not encoded subject" + \
|
|
u"\n\nNot encoded multipart1\nNot encoded multipart2\n", \
|
|
u"not encoded from"))
|
|
|
|
test_format_message_multi_encoded = \
|
|
make_test((True, True, True), \
|
|
lambda self, email: self.connection.format_message(email), \
|
|
(u"From : encoded from (éàê)\nSubject : encoded subject (éà" + \
|
|
u"ê)\n\nutf-8 multipart1 with no charset (éàê)" + \
|
|
u"\nEncoded multipart2 with 'iso-8859-15' charset (éàê)\n" + \
|
|
u"Encoded multipart3 with no charset (éàê)\n", \
|
|
u"encoded from (éàê)"))
|
|
|
|
|
|
class POP3Connection_TestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.server = dummy_server.DummyServer("localhost", 1110)
|
|
thread.start_new_thread(self.server.serve, ())
|
|
self.pop3connection = POP3Connection("login", \
|
|
"pass", \
|
|
"localhost", \
|
|
1110, \
|
|
ssl = False)
|
|
|
|
def tearDown(self):
|
|
self.server = None
|
|
self.pop3connection = None
|
|
|
|
def make_test(responses = None, queries = None, core = None):
|
|
def inner(self):
|
|
self.server.responses = ["+OK connected\r\n", \
|
|
"+OK name is a valid mailbox\r\n", \
|
|
"+OK pass\r\n"]
|
|
if responses:
|
|
self.server.responses += responses
|
|
self.server.queries = ["USER login\r\n", \
|
|
"PASS pass\r\n"]
|
|
if queries:
|
|
self.server.queries += queries
|
|
self.server.queries += ["QUIT\r\n"]
|
|
self.pop3connection.connect()
|
|
self.failUnless(self.pop3connection.connection, \
|
|
"Cannot establish connection")
|
|
if core:
|
|
core(self)
|
|
self.pop3connection.disconnect()
|
|
self.failUnless(self.server.verify_queries(), \
|
|
"Sended queries does not match expected queries.")
|
|
return inner
|
|
|
|
test_connection = make_test()
|
|
|
|
test_get_mail_list = \
|
|
make_test(["+OK 2 20\r\n"], \
|
|
["STAT\r\n"], \
|
|
lambda self: \
|
|
self.assertEquals(self.pop3connection.get_mail_list(), \
|
|
["1", "2"]))
|
|
|
|
test_get_mail_summary = \
|
|
make_test(["+OK 10 octets\r\n" + \
|
|
"From: user@test.com\r\n" + \
|
|
"Subject: subject test\r\n\r\n" + \
|
|
"mymessage\r\n.\r\n"], \
|
|
["RETR 1\r\n"], \
|
|
lambda self: \
|
|
self.assertEquals(self.pop3connection.get_mail_summary(1), \
|
|
(u"From : user@test.com\n" + \
|
|
u"Subject : subject test\n\n", \
|
|
u"user@test.com")))
|
|
|
|
test_get_mail = \
|
|
make_test(["+OK 10 octets\r\n" + \
|
|
"From: user@test.com\r\n" + \
|
|
"Subject: subject test\r\n\r\n" + \
|
|
"mymessage\r\n.\r\n"], \
|
|
["RETR 1\r\n"], \
|
|
lambda self: \
|
|
self.assertEquals(self.pop3connection.get_mail(1), \
|
|
(u"From : user@test.com\n" + \
|
|
u"Subject : subject test\n\n" + \
|
|
u"mymessage\n", \
|
|
u"user@test.com")))
|
|
|
|
|
|
class IMAPConnection_TestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.server = dummy_server.DummyServer("localhost", 1143)
|
|
thread.start_new_thread(self.server.serve, ())
|
|
self.imap_connection = IMAPConnection("login", \
|
|
"pass", \
|
|
"localhost", \
|
|
1143, \
|
|
ssl = False)
|
|
|
|
def tearDown(self):
|
|
self.server = None
|
|
self.imap_connection = None
|
|
|
|
def make_test(responses = None, queries = None, core = None):
|
|
def inner(self):
|
|
self.server.responses = ["* OK [CAPABILITY IMAP4 LOGIN-REFERRALS " + \
|
|
"AUTH=PLAIN]\n", \
|
|
lambda data: "* CAPABILITY IMAP4 " + \
|
|
"LOGIN-REFERRALS AUTH=PLAIN\n" + \
|
|
data.split()[0] + \
|
|
" OK CAPABILITY completed\n", \
|
|
lambda data: data.split()[0] + \
|
|
" OK LOGIN completed\n"]
|
|
if responses:
|
|
self.server.responses += responses
|
|
self.server.queries = ["^[^ ]* CAPABILITY", \
|
|
"^[^ ]* LOGIN login \"pass\""]
|
|
if queries:
|
|
self.server.queries += queries
|
|
self.server.queries += ["^[^ ]* LOGOUT"]
|
|
self.imap_connection.connect()
|
|
self.failUnless(self.imap_connection.connection, \
|
|
"Cannot establish connection")
|
|
if core:
|
|
core(self)
|
|
self.imap_connection.disconnect()
|
|
self.failUnless(self.server.verify_queries())
|
|
return inner
|
|
|
|
test_connection = make_test()
|
|
|
|
test_get_mail_list = make_test([lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" +\
|
|
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" +\
|
|
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
|
|
data.split()[0] + \
|
|
" OK [READ-WRITE] SELECT completed\n", \
|
|
lambda data: "* SEARCH 9 10 \n" + \
|
|
data.split()[0] + " OK SEARCH completed\n"], \
|
|
["^[^ ]* SELECT INBOX", \
|
|
"^[^ ]* SEARCH RECENT"], \
|
|
lambda self: \
|
|
self.assertEquals(self.imap_connection.get_mail_list(), ['9', '10']))
|
|
|
|
test_get_mail_summary = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
|
|
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
|
|
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
|
|
data.split()[0] + \
|
|
" OK [READ-WRITE] SELECT completed\r\n", \
|
|
lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody" + \
|
|
" text\r\n)\r\n" + \
|
|
data.split()[0] + " OK FETCH completed\r\n"], \
|
|
["^[^ ]* EXAMINE INBOX", \
|
|
"^[^ ]* FETCH 1 \(RFC822\)"], \
|
|
lambda self: self.assertEquals(self.imap_connection.get_mail_summary(1), \
|
|
(u"From : None\nSubject : None\n\n", \
|
|
u"None")))
|
|
|
|
test_get_mail = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
|
|
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
|
|
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
|
|
data.split()[0] + \
|
|
" OK [READ-WRITE] SELECT completed\r\n", \
|
|
lambda data: "* 1 FETCH ((RFC822) {11}\r\nbody" + \
|
|
" text\r\n)\r\n" + \
|
|
data.split()[0] + " OK FETCH completed\r\n"], \
|
|
["^[^ ]* EXAMINE INBOX", \
|
|
"^[^ ]* FETCH 1 \(RFC822\)",], \
|
|
lambda self: self.assertEquals(self.imap_connection.get_mail(1), \
|
|
(u"From : None\nSubject : None\n\nbody text\r\n\n", \
|
|
u"None")))
|
|
|