Complete MailComponent tests

darcs-hash:20070321170603-86b55-109cde21f4ff2e4220c3cc5c7297e0aef755978f.gz
This commit is contained in:
David Rousselie
2007-03-21 18:06:03 +01:00
parent 170f482ae1
commit 978b023ee6
10 changed files with 671 additions and 982 deletions

View File

@@ -35,11 +35,13 @@ sys.setdefaultencoding('utf8')
del sys.setdefaultencoding del sys.setdefaultencoding
import tests import tests
from tests.jmc.jabber.test_component import *
from tests.jmc.model.test_account import * from tests.jmc.model.test_account import *
from tests.jmc.test_lang import *
import jmc import jmc
import jmc.jabber import jmc.jabber
#import jmc.jabber.component import jmc.jabber.component
if __name__ == '__main__': if __name__ == '__main__':
logger = logging.getLogger() logger = logging.getLogger()
@@ -49,18 +51,26 @@ if __name__ == '__main__':
mail_account_suite = unittest.makeSuite(MailAccount_TestCase, "test") mail_account_suite = unittest.makeSuite(MailAccount_TestCase, "test")
imap_account_suite = unittest.makeSuite(IMAPAccount_TestCase, "test") imap_account_suite = unittest.makeSuite(IMAPAccount_TestCase, "test")
pop3_account_suite = unittest.makeSuite(POP3Account_TestCase, "test") pop3_account_suite = unittest.makeSuite(POP3Account_TestCase, "test")
lang_suite = unittest.makeSuite(Lang_TestCase, "test")
mail_component_suite = unittest.makeSuite(MailComponent_TestCase, "test")
jmc_suite = unittest.TestSuite() # jmc_suite = unittest.TestSuite((mail_component_suite))
jmc_suite = unittest.TestSuite((mail_account_suite, \ # jmc_suite = unittest.TestSuite()
# jmc_suite.addTest(MailAccount_TestCase('test_get_register_fields'))
jmc_suite = unittest.TestSuite((lang_suite, \
mail_account_suite, \
imap_account_suite, \ imap_account_suite, \
pop3_account_suite)) pop3_account_suite, \
mail_component_suite))
test_support.run_suite(jmc_suite) test_support.run_suite(jmc_suite)
coverage.stop() coverage.stop()
#coverage.analysis(jmc.jabber.component) coverage.analysis(jmc.jabber.component)
coverage.analysis(jmc.lang)
coverage.analysis(jmc.model.account) coverage.analysis(jmc.model.account)
coverage.report([ coverage.report([jmc.jabber.component, \
jmc.lang, \
jmc.model.account]) jmc.model.account])

View File

@@ -30,17 +30,31 @@ del sys.setdefaultencoding
from sqlobject import * from sqlobject import *
from pyxmpp.message import Message from pyxmpp.message import Message
from jcl.model import account
from jcl.model.account import Account, PresenceAccount
from jmc.jabber.component import MailComponent from jmc.jabber.component import MailComponent
from jmc.model.account import MailPresenceAccount from jmc.model.account import MailAccount, IMAPAccount, POP3Account
DB_PATH = "/tmp/jmc.db"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
logger = logging.getLogger() logger = logging.getLogger()
logger.addHandler(logging.StreamHandler()) logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
Account.createTable(ifNotExists = True)
PresenceAccount.createTable(ifNotExists = True)
MailAccount.createTable(ifNotExists = True)
IMAPAccount.createTable(ifNotExists = True)
POP3Account.createTable(ifNotExists = True)
del account.hub.threadConnection
component = MailComponent("jmc.localhost", \ component = MailComponent("jmc.localhost", \
"secret", \ "secret", \
"127.0.0.1", \ "127.0.0.1", \
5349, \ 5349, \
"sqlite:///tmp/jmc_test.db") "sqlite://" + DB_URL)
component.account_class = MailPresenceAccount
component.run() component.run()
logger.debug("JMC is exiting") logger.debug("JMC is exiting")

View File

@@ -1,2 +1,5 @@
"""JMC module""" """JMC module"""
__revision__ = "" __revision__ = ""
version = "0.3.0"

File diff suppressed because it is too large Load Diff

View File

@@ -21,31 +21,11 @@
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
## ##
class Lang: import jcl.lang
def __init__(self, default_lang = "en"):
self.default_lang = default_lang
def get_lang_from_node(self, node):
lang = node.getLang()
if lang is None:
print "Using default lang " + self.default_lang
lang = self.default_lang
return lang
def get_lang_class(self, lang): class Lang(jcl.lang.Lang):
if lang is not None: class en(jcl.lang.Lang.en):
lang = lang[:2]
if hasattr(Lang, lang):
return getattr(Lang, lang)
return getattr(Lang, self.default_lang)
def get_lang_class_from_node(self, node):
return self.get_lang_class(self.get_lang_from_node(node))
class en:
register_title = u"Jabber Mail connection registration"
register_instructions = u"Enter connection parameters"
account_name = u"Connection name"
account_login = u"Login" account_login = u"Login"
account_password = u"Password" account_password = u"Password"
account_password_store = u"Store password on Jabber server?" account_password_store = u"Store password on Jabber server?"
@@ -70,15 +50,6 @@ class Lang:
update_account_message_subject = u"Updated %s connection '%s'" update_account_message_subject = u"Updated %s connection '%s'"
update_account_message_body = u"Registered with username '%s' and " \ update_account_message_body = u"Registered with username '%s' and " \
"password '%s' on '%s'" "password '%s' on '%s'"
new_account_message_subject = u"New %s connection '%s' created"
new_account_message_body = u"Registered with " \
"username '%s' and password '%s' on '%s'"
ask_password_subject = u"Password request"
ask_password_body = u"Reply to this message with the password " \
"for the following account: \n" \
"\thost = %s\n" \
"\tlogin = %s\n"
password_saved_for_session = u"Password will be kept during your Jabber session"
check_error_subject = u"Error while checking emails." check_error_subject = u"Error while checking emails."
check_error_body = u"An error appears while checking emails:\n\t%s" check_error_body = u"An error appears while checking emails:\n\t%s"
new_mail_subject = u"New email from %s" new_mail_subject = u"New email from %s"

View File

@@ -33,7 +33,8 @@ import socket
from sqlobject.inheritance import InheritableSQLObject from sqlobject.inheritance import InheritableSQLObject
from sqlobject.col import StringCol, IntCol, BoolCol from sqlobject.col import StringCol, IntCol, BoolCol
from jcl.model.account import PresenceAccount from jcl.model import account
from jcl.model.account import Account, PresenceAccount
from jmc.lang import Lang from jmc.lang import Lang
IMAP4_TIMEOUT = 10 IMAP4_TIMEOUT = 10
@@ -148,15 +149,15 @@ class MailAccount(PresenceAccount):
lastcheck = IntCol(default = 0) lastcheck = IntCol(default = 0)
waiting_password_reply = BoolCol(default = False) waiting_password_reply = BoolCol(default = False)
in_error = BoolCol(default = False)
first_check = BoolCol(default = True) first_check = BoolCol(default = True)
def _init(self, *args, **kw): def _init(self, *args, **kw):
"""MailAccount init """MailAccount init
Initialize class attributes""" Initialize class attributes"""
InheritableSQLObject._init(self, *args, **kw) PresenceAccount._init(self, *args, **kw)
self.__logger = logging.getLogger("jmc.model.account.MailAccount") self.__logger = logging.getLogger("jmc.model.account.MailAccount")
self.connection = None self.connection = None
self.connected = False
self.default_lang_class = Lang.en # TODO: use String self.default_lang_class = Lang.en # TODO: use String
def _get_register_fields(cls): def _get_register_fields(cls):
@@ -167,7 +168,7 @@ class MailAccount(PresenceAccount):
return None return None
return password return password
return Account.get_register_fields() + \ return PresenceAccount.get_register_fields() + \
[("login", "text-single", None, account.string_not_null_post_func, \ [("login", "text-single", None, account.string_not_null_post_func, \
account.mandatory_field), \ account.mandatory_field), \
("password", "text-private", None, password_post_func, \ ("password", "text-private", None, password_post_func, \
@@ -314,23 +315,13 @@ class MailAccount(PresenceAccount):
def get_next_mail_index(self, mail_list): def get_next_mail_index(self, mail_list):
raise NotImplementedError raise NotImplementedError
def is_mail_list_valid(self, mail_list):
return (mail_list and mail_list != [] and mail_list[0] != '')
# Does not modify server state but just internal JMC state # Does not modify server state but just internal JMC state
def mark_all_as_read(self): def mark_all_as_read(self):
raise NotImplementedError raise NotImplementedError
def get_action(self):
mapping = {"online": self.online_action,
"chat": self.chat_action,
"away": self.away_action,
"xa": self.xa_action,
"dnd": self.dnd_action,
"offline": self.offline_action}
if mapping.has_key(self.status):
return mapping[self.status]
return PresenceAccount.DO_NOTHING
action = property(get_action)
class IMAPAccount(MailAccount): class IMAPAccount(MailAccount):
mailbox = StringCol(default = "INBOX") # TODO : set default INBOX in reg_form (use get_register_fields last field ?) mailbox = StringCol(default = "INBOX") # TODO : set default INBOX in reg_form (use get_register_fields last field ?)
@@ -344,11 +335,7 @@ class IMAPAccount(MailAccount):
return MailAccount.get_register_fields() + \ return MailAccount.get_register_fields() + \
[("mailbox", "text-single", None, account.string_not_null_post_func, \ [("mailbox", "text-single", None, account.string_not_null_post_func, \
(lambda field_name: "INBOX")), \ (lambda field_name: "INBOX"))]
("password", "text-private", None, password_post_func, \
(lambda field_name: None)), \
("store_password", "boolean", None, account.boolean_post_func, \
lambda field_name: True)]
get_register_fields = classmethod(_get_register_fields) get_register_fields = classmethod(_get_register_fields)
@@ -375,11 +362,13 @@ class IMAPAccount(MailAccount):
else: else:
self.connection = MYIMAP4(self.host, self.port) self.connection = MYIMAP4(self.host, self.port)
self.connection.login(self.login, self.password) self.connection.login(self.login, self.password)
self.connected = True
def disconnect(self): def disconnect(self):
self.__logger.debug("Disconnecting from IMAP server " \ self.__logger.debug("Disconnecting from IMAP server " \
+ self.host) + self.host)
self.connection.logout() self.connection.logout()
self.connected = False
def get_mail_list(self): def get_mail_list(self):
self.__logger.debug("Getting mail list") self.__logger.debug("Getting mail list")
@@ -406,9 +395,10 @@ class IMAPAccount(MailAccount):
return u"Error while fetching mail " + str(index) return u"Error while fetching mail " + str(index)
def get_next_mail_index(self, mail_list): def get_next_mail_index(self, mail_list):
if not mail_list or mail_list[0] == '': if self.is_mail_list_valid(mail_list):
return mail_list.pop(0)
else:
return None return None
return mail_list.pop(0)
def mark_all_as_read(self): def mark_all_as_read(self):
self.get_mail_list() self.get_mail_list()
@@ -443,12 +433,14 @@ class POP3Account(MailAccount):
except: except:
self.connection.user(self.login) self.connection.user(self.login)
self.connection.pass_(self.password) self.connection.pass_(self.password)
self.connected = True
def disconnect(self): def disconnect(self):
self.__logger.debug("Disconnecting from POP3 server " \ self.__logger.debug("Disconnecting from POP3 server " \
+ self.host) + self.host)
self.connection.quit() self.connection.quit()
self.connected = False
def get_mail_list(self): def get_mail_list(self):
self.__logger.debug("Getting mail list") self.__logger.debug("Getting mail list")
@@ -479,13 +471,16 @@ class POP3Account(MailAccount):
return u"Error while fetching mail " + str(index) return u"Error while fetching mail " + str(index)
def get_next_mail_index(self, mail_list): def get_next_mail_index(self, mail_list):
if self.nb_mail == self.lastmail: if self.is_mail_list_valid(mail_list):
if self.nb_mail == self.lastmail:
return None
if self.nb_mail < self.lastmail:
self.lastmail = 0
result = int(mail_list[self.lastmail])
self.lastmail += 1
return result
else:
return None return None
if self.nb_mail < self.lastmail:
self.lastmail = 0
result = int(mail_list[self.lastmail])
self.lastmail += 1
return result
def mark_all_as_read(self): def mark_all_as_read(self):
self.get_mail_list() self.get_mail_list()

View File

@@ -58,6 +58,7 @@ class DummyServer:
try: try:
s = socket.socket(af, socktype, proto) s = socket.socket(af, socktype, proto)
except socket.error, msg: except socket.error, msg:
print >>sys.stderr, msg
s = None s = None
raise socket.error raise socket.error
try: try:
@@ -65,6 +66,7 @@ class DummyServer:
s.bind(sa) s.bind(sa)
s.listen(1) s.listen(1)
except socket.error, msg: except socket.error, msg:
print >>sys.stderr, msg
s.close() s.close()
s = None s = None
raise socket.error raise socket.error

View File

@@ -21,6 +21,471 @@
## ##
import unittest import unittest
import os
from sqlobject import *
from sqlobject.dbconnection import TheURIOpener
from jcl.model import account
from jcl.model.account import Account, PresenceAccount
from jmc.model.account import MailAccount, IMAPAccount, POP3Account
from jmc.jabber.component import MailComponent
DB_PATH = "/tmp/test_jmc.db"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
class MockStream(object):
def __init__(self, \
jid = "",
secret = "",
server = "",
port = "",
keepalive = True):
self.sent = []
self.connection_started = False
self.connection_stopped = False
self.eof = False
self.socket = []
def send(self, iq):
self.sent.append(iq)
def set_iq_set_handler(self, iq_type, ns, handler):
if not iq_type in ["query"]:
raise Exception("IQ type unknown: " + iq_type)
if not ns in ["jabber:iq:version", \
"jabber:iq:register", \
"http://jabber.org/protocol/disco#items", \
"http://jabber.org/protocol/disco#info"]:
raise Exception("Unknown namespace: " + ns)
if handler is None:
raise Exception("Handler must not be None")
set_iq_get_handler = set_iq_set_handler
def set_presence_handler(self, status, handler):
if not status in ["available", \
"unavailable", \
"probe", \
"subscribe", \
"subscribed", \
"unsubscribe", \
"unsubscribed"]:
raise Exception("Status unknown: " + status)
if handler is None:
raise Exception("Handler must not be None")
def set_message_handler(self, msg_type, handler):
if not msg_type in ["normal"]:
raise Exception("Message type unknown: " + msg_type)
if handler is None:
raise Exception("Handler must not be None")
def connect(self):
self.connection_started = True
def disconnect(self):
self.connection_stopped = True
def loop_iter(self, timeout):
time.sleep(timeout)
def close(self):
pass
class MockMailAccount():
def _init(self):
self.connected = False
self.has_connected = False
self.marked_all_as_read = False
self._action = PresenceAccount.DO_NOTHING
def connect(self):
self.connected = True
self.has_connected = True
def mark_all_as_read(self):
self.marked_all_as_read = True
def disconnect(self):
self.connected = False
def get_action(self):
return self._action
action = property(get_action)
class MockIMAPAccount(MockMailAccount, IMAPAccount):
def _init(self, *args, **kw):
IMAPAccount._init(self, *args, **kw)
MockMailAccount._init(self)
class MockPOP3Account(MockMailAccount, POP3Account):
def _init(self, *args, **kw):
IMAPAccount._init(self, *args, **kw)
MockMailAccount._init(self)
class MailComponent_TestCase(unittest.TestCase): class MailComponent_TestCase(unittest.TestCase):
pass def setUp(self):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.comp = MailComponent("jmc.test.com",
"password",
"localhost",
"5347",
'sqlite://' + DB_URL)
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
Account.createTable(ifNotExists = True)
PresenceAccount.createTable(ifNotExists = True)
MailAccount.createTable(ifNotExists = True)
IMAPAccount.createTable(ifNotExists = True)
POP3Account.createTable(ifNotExists = True)
MockIMAPAccount.createTable(ifNotExists = True)
MockPOP3Account.createTable(ifNotExists = True)
del account.hub.threadConnection
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
MockPOP3Account.dropTable(ifExists = True)
MockIMAPAccount.dropTable(ifExists = True)
POP3Account.dropTable(ifExists = True)
IMAPAccount.dropTable(ifExists = True)
MailAccount.dropTable(ifExists = True)
PresenceAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
###########################################################################
# 'feed' test methods
###########################################################################
def test_feed_live_email_init_no_password(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
self.assertFalse(account11.waiting_password_reply)
account11.live_email_only = True
account11.password = None
result = self.comp.feeder.feed(account11)
self.assertEquals(result, [])
self.assertTrue(account11.first_check)
self.assertTrue(account11.waiting_password_reply)
self.assertFalse(account11.in_error)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
self.assertEquals(len(self.comp.stream.sent), 1)
del account.hub.threadConnection
def test_feed_live_email_init_no_password2(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.waiting_password_reply = True
account11.live_email_only = True
account11.password = None
result = self.comp.feeder.feed(account11)
self.assertEquals(result, [])
self.assertTrue(account11.first_check)
self.assertTrue(account11.waiting_password_reply)
self.assertFalse(account11.in_error)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
def test_feed_interval_no_check(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = PresenceAccount.DO_NOTHING
account11.first_check = False
self.assertEquals(account11.lastcheck, 0)
account11.interval = 2
result = self.comp.feeder.feed(account11)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 1)
del account.hub.threadConnection
def test_feed_interval_check(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = PresenceAccount.DO_NOTHING
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
result = self.comp.feeder.feed(account11)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
del account.hub.threadConnection
def test_feed_no_password(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.RETRIEVE
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = None
self.assertFalse(account11.waiting_password_reply)
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 1)
del account.hub.threadConnection
def test_feed_unknown_action(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = 42 # Unknown action
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
result = self.comp.feeder.feed(account11)
self.assertTrue(account11.in_error)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 1)
del account.hub.threadConnection
def test_feed_retrieve_no_mail(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.RETRIEVE
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
def test_feed_retrieve_mail(self):
def mock_get_mail(index):
return [("body1", "from1@test.com"), \
("body2", "from2@test.com")][index]
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.RETRIEVE
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: [0, 1]
account11.get_mail = mock_get_mail
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
self.assertEquals(len(result), 2)
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].stanza_type, "message")
self.assertEquals(result[0].get_subject(), \
account11.default_lang_class.new_mail_subject \
% ("from1@test.com"))
self.assertEquals(result[0].get_body(), "body1")
self.assertEquals(result[1].get_from(), "account11@jmc.test.com")
self.assertEquals(result[1].get_to(), "test1@test.com")
self.assertEquals(result[1].stanza_type, "message")
self.assertEquals(result[1].get_subject(), \
account11.default_lang_class.new_mail_subject \
% ("from2@test.com"))
self.assertEquals(result[1].get_body(), "body2")
del account.hub.threadConnection
def test_feed_digest_no_mail(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.DIGEST
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
def test_feed_digest_mail(self):
def mock_get_mail_summary(index):
return [("body1", "from1@test.com"), \
("body2", "from2@test.com")][index]
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.DIGEST
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: [0, 1]
account11.get_mail_summary = mock_get_mail_summary
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].stanza_type, "message")
self.assertEquals(result[0].get_subject(), \
account11.default_lang_class.new_digest_subject \
% (2))
self.assertEquals(result[0].get_body(), \
"body1\n----------------------------------\nbody2\n----------------------------------\n")
del account.hub.threadConnection
###########################################################################
# 'initialize_live_email' test methods
###########################################################################
def test_initialize_live_email(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.live_email_only = True
account11.password = "password"
result = self.comp.feeder.initialize_live_email(account11)
self.assertEquals(result, True)
self.assertFalse(account11.first_check)
self.assertFalse(account11.waiting_password_reply)
self.assertFalse(account11.in_error)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertTrue(account11.marked_all_as_read)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
def test_initialize_live_email_connection_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.connect = raiser
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.live_email_only = True
account11.password = "password"
result = self.comp.feeder.initialize_live_email(account11)
self.assertEquals(result, False)
self.assertTrue(account11.first_check)
self.assertFalse(account11.waiting_password_reply)
self.assertTrue(account11.in_error)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
self.assertEquals(len(self.comp.stream.sent), 1)
del account.hub.threadConnection
def test_initialize_live_email_mark_as_read_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.mark_all_as_read = raiser
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.live_email_only = True
account11.password = "password"
result = self.comp.feeder.initialize_live_email(account11)
self.assertEquals(result, False)
self.assertTrue(account11.first_check)
self.assertFalse(account11.waiting_password_reply)
self.assertTrue(account11.in_error)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
self.assertEquals(len(self.comp.stream.sent), 1)
del account.hub.threadConnection
def test_initialize_live_email_disconnection_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.disconnect = raiser
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.live_email_only = True
account11.password = "password"
result = self.comp.feeder.initialize_live_email(account11)
self.assertEquals(result, False)
self.assertTrue(account11.first_check)
self.assertFalse(account11.waiting_password_reply)
self.assertTrue(account11.in_error)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertTrue(account11.marked_all_as_read)
self.assertEquals(len(self.comp.stream.sent), 1)
del account.hub.threadConnection

View File

@@ -139,11 +139,12 @@ class MailAccount_TestCase(unittest.TestCase):
u"Encoded multipart3 with no charset (éàê)\n", \ u"Encoded multipart3 with no charset (éàê)\n", \
u"encoded from (éàê)")) u"encoded from (éàê)"))
def test_get_register_fields(self):
register_fields = MailAccount.get_register_fields()
self.assertEquals(len(register_fields), 14)
class POP3Account_TestCase(unittest.TestCase): class POP3Account_TestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.server = dummy_server.DummyServer("localhost", 1110)
thread.start_new_thread(self.server.serve, ())
if os.path.exists(DB_PATH): if os.path.exists(DB_PATH):
os.unlink(DB_PATH) os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
@@ -177,6 +178,8 @@ class POP3Account_TestCase(unittest.TestCase):
def make_test(responses = None, queries = None, core = None): def make_test(responses = None, queries = None, core = None):
def inner(self): def inner(self):
self.server = dummy_server.DummyServer("localhost", 1110)
thread.start_new_thread(self.server.serve, ())
self.server.responses = ["+OK connected\r\n", \ self.server.responses = ["+OK connected\r\n", \
"+OK name is a valid mailbox\r\n", \ "+OK name is a valid mailbox\r\n", \
"+OK pass\r\n"] "+OK pass\r\n"]
@@ -266,11 +269,12 @@ class POP3Account_TestCase(unittest.TestCase):
u"mymessage\n", \ u"mymessage\n", \
u"user@test.com"))) u"user@test.com")))
def test_get_register_fields(self):
register_fields = POP3Account.get_register_fields()
self.assertEquals(len(register_fields), 14)
class IMAPAccount_TestCase(unittest.TestCase): class IMAPAccount_TestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.server = dummy_server.DummyServer("localhost", 1143)
thread.start_new_thread(self.server.serve, ())
if os.path.exists(DB_PATH): if os.path.exists(DB_PATH):
os.unlink(DB_PATH) os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
@@ -304,6 +308,8 @@ class IMAPAccount_TestCase(unittest.TestCase):
def make_test(responses = None, queries = None, core = None): def make_test(responses = None, queries = None, core = None):
def inner(self): def inner(self):
self.server = dummy_server.DummyServer("localhost", 1143)
thread.start_new_thread(self.server.serve, ())
self.server.responses = ["* OK [CAPABILITY IMAP4 LOGIN-REFERRALS " + \ self.server.responses = ["* OK [CAPABILITY IMAP4 LOGIN-REFERRALS " + \
"AUTH=PLAIN]\n", \ "AUTH=PLAIN]\n", \
lambda data: "* CAPABILITY IMAP4 " + \ lambda data: "* CAPABILITY IMAP4 " + \
@@ -372,3 +378,6 @@ class IMAPAccount_TestCase(unittest.TestCase):
(u"From : None\nSubject : None\n\nbody text\r\n\n", \ (u"From : None\nSubject : None\n\nbody text\r\n\n", \
u"None"))) u"None")))
def test_get_register_fields(self):
register_fields = IMAPAccount.get_register_fields()
self.assertEquals(len(register_fields), 15)

View File

@@ -22,7 +22,7 @@
## ##
import unittest import unittest
from jmc.utils.lang import Lang from jmc.lang import Lang
from pyxmpp.iq import Iq from pyxmpp.iq import Iq