Move unit tests in source folder

darcs-hash:20070513183302-86b55-98a5e67621ece44958f215e98ba1c92e32c4ea51.gz
This commit is contained in:
David Rousselie
2007-05-13 20:33:02 +02:00
parent cde3a9f16f
commit 0f95cc4678
15 changed files with 87 additions and 40 deletions

View File

@@ -0,0 +1,14 @@
"""JMC test module"""
__revision__ = ""
import unittest
from jmc.jabber.tests import component
def suite():
suite = unittest.TestSuite()
suite.addTest(component.suite())
return suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View File

@@ -0,0 +1,510 @@
##
## test_component.py
## Login : <dax@happycoders.org>
## Started on Wed Feb 14 18:04:49 2007 David Rousselie
## $Id$
##
## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import unittest
import os
import sys
from sqlobject import *
from sqlobject.dbconnection import TheURIOpener
from jcl.model import account
from jcl.model.account import Account, PresenceAccount
from jmc.model.account import MailAccount, IMAPAccount, POP3Account
from jmc.jabber.component import MailComponent
if sys.platform == "win32":
DB_PATH = "/c|/temp/test.db"
else:
DB_PATH = "/tmp/test.db"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
class MockStream(object):
def __init__(self, \
jid = "",
secret = "",
server = "",
port = "",
keepalive = True):
self.sent = []
self.connection_started = False
self.connection_stopped = False
self.eof = False
self.socket = []
def send(self, iq):
self.sent.append(iq)
def set_iq_set_handler(self, iq_type, ns, handler):
if not iq_type in ["query"]:
raise Exception("IQ type unknown: " + iq_type)
if not ns in ["jabber:iq:version", \
"jabber:iq:register", \
"http://jabber.org/protocol/disco#items", \
"http://jabber.org/protocol/disco#info"]:
raise Exception("Unknown namespace: " + ns)
if handler is None:
raise Exception("Handler must not be None")
set_iq_get_handler = set_iq_set_handler
def set_presence_handler(self, status, handler):
if not status in ["available", \
"unavailable", \
"probe", \
"subscribe", \
"subscribed", \
"unsubscribe", \
"unsubscribed"]:
raise Exception("Status unknown: " + status)
if handler is None:
raise Exception("Handler must not be None")
def set_message_handler(self, msg_type, handler):
if not msg_type in ["normal"]:
raise Exception("Message type unknown: " + msg_type)
if handler is None:
raise Exception("Handler must not be None")
def connect(self):
self.connection_started = True
def disconnect(self):
self.connection_stopped = True
def loop_iter(self, timeout):
time.sleep(timeout)
def close(self):
pass
class MockMailAccount(object):
def _init(self):
self.connected = False
self.has_connected = False
self.marked_all_as_read = False
self._action = PresenceAccount.DO_NOTHING
def connect(self):
self.connected = True
self.has_connected = True
def mark_all_as_read(self):
self.marked_all_as_read = True
def disconnect(self):
self.connected = False
def get_action(self):
return self._action
action = property(get_action)
class MockIMAPAccount(MockMailAccount, IMAPAccount):
def _init(self, *args, **kw):
IMAPAccount._init(self, *args, **kw)
MockMailAccount._init(self)
class MockPOP3Account(MockMailAccount, POP3Account):
def _init(self, *args, **kw):
IMAPAccount._init(self, *args, **kw)
MockMailAccount._init(self)
class MailComponent_TestCase(unittest.TestCase):
def setUp(self):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.comp = MailComponent("jmc.test.com",
"password",
"localhost",
"5347",
'sqlite://' + DB_URL)
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
Account.createTable(ifNotExists = True)
PresenceAccount.createTable(ifNotExists = True)
MailAccount.createTable(ifNotExists = True)
IMAPAccount.createTable(ifNotExists = True)
POP3Account.createTable(ifNotExists = True)
MockIMAPAccount.createTable(ifNotExists = True)
MockPOP3Account.createTable(ifNotExists = True)
del account.hub.threadConnection
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
MockPOP3Account.dropTable(ifExists = True)
MockIMAPAccount.dropTable(ifExists = True)
POP3Account.dropTable(ifExists = True)
IMAPAccount.dropTable(ifExists = True)
MailAccount.dropTable(ifExists = True)
PresenceAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
###########################################################################
# 'feed' test methods
###########################################################################
def test_feed_live_email_init_no_password(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
self.assertFalse(account11.waiting_password_reply)
account11.live_email_only = True
account11.password = None
result = self.comp.feeder.feed(account11)
self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertTrue(account11.first_check)
self.assertTrue(account11.waiting_password_reply)
self.assertFalse(account11.in_error)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
del account.hub.threadConnection
def test_feed_live_email_init_no_password2(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.waiting_password_reply = True
account11.live_email_only = True
account11.password = None
result = self.comp.feeder.feed(account11)
self.assertEquals(result, [])
self.assertTrue(account11.first_check)
self.assertTrue(account11.waiting_password_reply)
self.assertFalse(account11.in_error)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
def test_feed_interval_no_check(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = PresenceAccount.DO_NOTHING
account11.first_check = False
self.assertEquals(account11.lastcheck, 0)
account11.interval = 2
result = self.comp.feeder.feed(account11)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 1)
del account.hub.threadConnection
def test_feed_interval_check(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = PresenceAccount.DO_NOTHING
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
result = self.comp.feeder.feed(account11)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
del account.hub.threadConnection
def test_feed_no_password(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.RETRIEVE
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = None
self.assertFalse(account11.waiting_password_reply)
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
del account.hub.threadConnection
def test_feed_unknown_action(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = 42 # Unknown action
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
result = self.comp.feeder.feed(account11)
self.assertTrue(account11.in_error)
self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
del account.hub.threadConnection
def test_feed_retrieve_no_mail(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.RETRIEVE
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
def test_feed_retrieve_mail(self):
def mock_get_mail(index):
return [("body1", "from1@test.com"), \
("body2", "from2@test.com")][index]
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.RETRIEVE
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: [0, 1]
account11.get_mail = mock_get_mail
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
self.assertEquals(len(result), 2)
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].stanza_type, "message")
self.assertEquals(result[0].get_subject(), \
account11.default_lang_class.new_mail_subject \
% ("from1@test.com"))
self.assertEquals(result[0].get_body(), "body1")
self.assertEquals(result[1].get_from(), "account11@jmc.test.com")
self.assertEquals(result[1].get_to(), "test1@test.com")
self.assertEquals(result[1].stanza_type, "message")
self.assertEquals(result[1].get_subject(), \
account11.default_lang_class.new_mail_subject \
% ("from2@test.com"))
self.assertEquals(result[1].get_body(), "body2")
del account.hub.threadConnection
def test_feed_digest_no_mail(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.DIGEST
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: []
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
def test_feed_digest_mail(self):
def mock_get_mail_summary(index):
return [("body1", "from1@test.com"), \
("body2", "from2@test.com")][index]
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11._action = MailAccount.DIGEST
account11.status = account.ONLINE
account11.first_check = False
account11.lastcheck = 1
account11.interval = 2
account11.password = "password"
account11.get_mail_list = lambda: [0, 1]
account11.get_mail_summary = mock_get_mail_summary
result = self.comp.feeder.feed(account11)
self.assertFalse(account11.in_error)
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].stanza_type, "message")
self.assertEquals(result[0].get_subject(), \
account11.default_lang_class.new_digest_subject \
% (2))
self.assertEquals(result[0].get_body(), \
"body1\n----------------------------------\nbody2\n----------------------------------\n")
del account.hub.threadConnection
###########################################################################
# 'initialize_live_email' test methods
###########################################################################
def test_initialize_live_email(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.live_email_only = True
account11.password = "password"
(continue_checking, result) = self.comp.feeder.initialize_live_email(account11)
self.assertEquals(continue_checking, True)
self.assertEquals(result, [])
self.assertFalse(account11.first_check)
self.assertFalse(account11.waiting_password_reply)
self.assertFalse(account11.in_error)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertTrue(account11.marked_all_as_read)
del account.hub.threadConnection
def test_initialize_live_email_connection_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.connect = raiser
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.live_email_only = True
account11.password = "password"
(continue_checking, result) = self.comp.feeder.initialize_live_email(account11)
self.assertEquals(continue_checking, False)
self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertTrue(account11.first_check)
self.assertFalse(account11.waiting_password_reply)
self.assertTrue(account11.in_error)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
del account.hub.threadConnection
def test_initialize_live_email_mark_as_read_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.mark_all_as_read = raiser
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.live_email_only = True
account11.password = "password"
(continue_checking, result) = self.comp.feeder.initialize_live_email(account11)
self.assertEquals(continue_checking, False)
self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertTrue(account11.first_check)
self.assertFalse(account11.waiting_password_reply)
self.assertTrue(account11.in_error)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
del account.hub.threadConnection
def test_initialize_live_email_disconnection_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
account11.disconnect = raiser
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
self.assertFalse(account11.in_error)
account11.live_email_only = True
account11.password = "password"
(continue_checking, result) = self.comp.feeder.initialize_live_email(account11)
self.assertEquals(len(result), 1)
self.assertEquals(result[0].get_to(), "test1@test.com")
self.assertEquals(result[0].get_from(), "account11@jmc.test.com")
self.assertEquals(continue_checking, False)
self.assertTrue(account11.first_check)
self.assertFalse(account11.waiting_password_reply)
self.assertTrue(account11.in_error)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertTrue(account11.marked_all_as_read)
del account.hub.threadConnection
def suite():
return unittest.makeSuite(MailComponent_TestCase, 'test')
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View File

@@ -133,7 +133,7 @@ class MailAccount(PresenceAccount):
# Define constants
DIGEST = 1
RETRIEVE = 2
default_encoding = "iso-8859-1"
default_encoding = "utf-8"
possibles_actions = [PresenceAccount.DO_NOTHING, \
DIGEST, \
RETRIEVE]
@@ -223,13 +223,13 @@ class MailAccount(PresenceAccount):
if content_charset:
result = unicode(part.get_payload(decode=True).decode(content_charset))
else:
result = unicode(part.get_payload(decode=True))
result = unicode(part.get_payload(decode=True).decode(MailAccount.default_encoding))
except Exception, e:
try:
result = unicode(part.get_payload(decode=True).decode("iso-8859-1"))
result = unicode(part.get_payload(decode=True))
except Exception, e:
try:
result = unicode(part.get_payload(decode=True).decode(default_encoding))
result = unicode(part.get_payload(decode=True).decode("iso-8859-1"))
except Exception, e:
if charset_hint is not None:
try:
@@ -252,13 +252,13 @@ class MailAccount(PresenceAccount):
charset_hint = from_decoded[i][1]
email_from += unicode(from_decoded[i][0].decode(from_decoded[i][1]))
else:
email_from += unicode(from_decoded[i][0])
email_from += unicode(from_decoded[i][0].decode(MailAccount.default_encoding))
except Exception,e:
try:
email_from += unicode(from_decoded[i][0].decode("iso-8859-1"))
email_from += unicode(from_decoded[i][0])
except Exception, e:
try:
email_from += unicode(from_decoded[i][0].decode(default_encoding))
email_from += unicode(from_decoded[i][0].decode("iso-8859-1"))
except Exception, e:
type, value, stack = sys.exc_info()
print >>sys.stderr, "".join(traceback.format_exception
@@ -273,13 +273,13 @@ class MailAccount(PresenceAccount):
charset_hint = subject_decoded[i][1]
result += unicode(subject_decoded[i][0].decode(subject_decoded[i][1]))
else:
result += unicode(subject_decoded[i][0])
result += unicode(subject_decoded[i][0].decode(MailAccount.default_encoding))
except Exception,e:
try:
result += unicode(subject_decoded[i][0].decode("iso-8859-1"))
result += unicode(subject_decoded[i][0])
except Exception, e:
try:
result += unicode(subject_decoded[i][0].decode(default_encoding))
result += unicode(subject_decoded[i][0].decode("iso-8859-1"))
except Exception, e:
if charset_hint is not None:
try:

View File

@@ -0,0 +1,14 @@
"""JMC test module"""
__revision__ = ""
import unittest
from jmc.model.tests import account
def suite():
suite = unittest.TestSuite()
suite.addTest(account.suite())
return suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View File

@@ -0,0 +1,400 @@
# -*- coding: utf-8 -*-
##
## test_account.py
## Login : <dax@happycoders.org>
## Started on Wed Feb 14 08:23:17 2007 David Rousselie
## $Id$
##
## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import unittest
import os
import thread
import sys
from sqlobject import *
from sqlobject.dbconnection import TheURIOpener
from jcl.model import account
from jcl.model.account import Account, PresenceAccount
from jmc.model.account import MailAccount, POP3Account, IMAPAccount
from jcl.model.tests.account import PresenceAccount_TestCase
from jmc.model.tests import email_generator, server
if sys.platform == "win32":
DB_PATH = "/c|/temp/test.db"
else:
DB_PATH = "/tmp/test.db"
DB_URL = DB_PATH # + "?debug=1&debugThreading=1"
class MailAccount_TestCase(PresenceAccount_TestCase):
def setUp(self):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
Account.createTable(ifNotExists = True)
PresenceAccount.createTable(ifNotExists = True)
MailAccount.createTable(ifNotExists = True)
self.account = MailAccount(user_jid = "user1@test.com", \
name = "account1", \
jid = "account1@jmc.test.com")
del account.hub.threadConnection
self.account_class = MailAccount
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
MailAccount.dropTable(ifExists = True)
PresenceAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def make_test(email_type, tested_func, expected_res):
def inner(self):
encoded, multipart, header = email_type
email = email_generator.generate(encoded, \
multipart, \
header)
part = tested_func(self, email)
self.assertEquals(part, expected_res)
return inner
test_get_decoded_part_not_encoded = \
make_test((False, False, False), \
lambda self, email: self.account.get_decoded_part(email, None), \
u"Not encoded single part")
test_get_decoded_part_encoded = \
make_test((True, False, False), \
lambda self, email: self.account.get_decoded_part(email, None), \
u"Encoded single part with 'iso-8859-15' charset (éàê)")
test_format_message_summary_not_encoded = \
make_test((False, False, True), \
lambda self, email: self.account.format_message_summary(email), \
(u"From : not encoded from\nSubject : not encoded subject\n\n", \
u"not encoded from"))
test_format_message_summary_encoded = \
make_test((True, False, True), \
lambda self, email: self.account.format_message_summary(email), \
(u"From : encoded from (éàê)\nSubject : encoded subject " + \
u"(éàê)\n\n", \
u"encoded from (éàê)"))
test_format_message_summary_partial_encoded = \
make_test((True, False, True), \
lambda self, email: \
email.replace_header("Subject", \
"\" " + str(email["Subject"]) \
+ " \" not encoded part") or \
email.replace_header("From", \
"\" " + str(email["From"]) \
+ " \" not encoded part") or \
self.account.format_message_summary(email), \
(u"From : \"encoded from (éàê)\" not encoded part\nSubject " + \
u": \"encoded subject (éàê)\" not encoded part\n\n", \
u"\"encoded from (éàê)\" not encoded part"))
test_format_message_single_not_encoded = \
make_test((False, False, True), \
lambda self, email: self.account.format_message(email), \
(u"From : not encoded from\nSubject : not encoded subject" + \
u"\n\nNot encoded single part\n", \
u"not encoded from"))
test_format_message_single_encoded = \
make_test((True, False, True), \
lambda self, email: self.account.format_message(email), \
(u"From : encoded from (éàê)\nSubject : encoded subject " + \
u"(éàê)\n\nEncoded single part with 'iso-8859-15' charset" + \
u" (éàê)\n", \
u"encoded from (éàê)"))
test_format_message_multi_not_encoded = \
make_test((False, True, True), \
lambda self, email: self.account.format_message(email), \
(u"From : not encoded from\nSubject : not encoded subject" + \
u"\n\nNot encoded multipart1\nNot encoded multipart2\n", \
u"not encoded from"))
test_format_message_multi_encoded = \
make_test((True, True, True), \
lambda self, email: self.account.format_message(email), \
(u"From : encoded from (éàê)\nSubject : encoded subject (éà" + \
u"ê)\n\nutf-8 multipart1 with no charset (éàê)" + \
u"\nEncoded multipart2 with 'iso-8859-15' charset (éàê)\n" + \
u"Encoded multipart3 with no charset (éàê)\n", \
u"encoded from (éàê)"))
def test_get_register_fields(self):
register_fields = MailAccount.get_register_fields()
self.assertEquals(len(register_fields), 14)
class POP3Account_TestCase(unittest.TestCase):
def setUp(self):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
Account.createTable(ifNotExists = True)
PresenceAccount.createTable(ifNotExists = True)
MailAccount.createTable(ifNotExists = True)
POP3Account.createTable(ifNotExists = True)
self.pop3_account = POP3Account(user_jid = "user1@test.com", \
name = "account1", \
jid = "account1@jmc.test.com", \
login = "login")
self.pop3_account.password = "pass"
self.pop3_account.host = "localhost"
self.pop3_account.port = 1110
self.pop3_account.ssl = False
del account.hub.threadConnection
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
POP3Account.dropTable(ifExists = True)
MailAccount.dropTable(ifExists = True)
PresenceAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.server = None
self.pop3_account = None
def make_test(responses = None, queries = None, core = None):
def inner(self):
self.server = server.DummyServer("localhost", 1110)
thread.start_new_thread(self.server.serve, ())
self.server.responses = ["+OK connected\r\n", \
"+OK name is a valid mailbox\r\n", \
"+OK pass\r\n"]
if responses:
self.server.responses += responses
self.server.queries = ["USER login\r\n", \
"PASS pass\r\n"]
if queries:
self.server.queries += queries
self.server.queries += ["QUIT\r\n"]
self.pop3_account.connect()
self.failUnless(self.pop3_account.connection, \
"Cannot establish connection")
if core:
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
core(self)
del account.hub.threadConnection
self.pop3_account.disconnect()
self.failUnless(self.server.verify_queries(), \
"Sended queries does not match expected queries.")
return inner
test_connection = make_test()
test_get_mail_list = \
make_test(["+OK 2 20\r\n"], \
["STAT\r\n"], \
lambda self: \
self.assertEquals(self.pop3_account.get_mail_list(), \
["1", "2"]))
test_get_mail_summary = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"+OK\r\n"], \
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
self.assertEquals(self.pop3_account.get_mail_summary(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n", \
u"user@test.com")))
test_get_mail = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"+OK\r\n"], \
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
self.assertEquals(self.pop3_account.get_mail(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n" + \
u"mymessage\n", \
u"user@test.com")))
test_unsupported_reset_command_get_mail_summary = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"-ERR unknown command\r\n"], \
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
self.assertEquals(self.pop3_account.get_mail_summary(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n", \
u"user@test.com")))
test_unsupported_reset_command_get_mail = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n",
"-ERR unknown command\r\n"], \
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
self.assertEquals(self.pop3_account.get_mail(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n" + \
u"mymessage\n", \
u"user@test.com")))
def test_get_register_fields(self):
register_fields = POP3Account.get_register_fields()
self.assertEquals(len(register_fields), 14)
class IMAPAccount_TestCase(unittest.TestCase):
def setUp(self):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
Account.createTable(ifNotExists = True)
PresenceAccount.createTable(ifNotExists = True)
MailAccount.createTable(ifNotExists = True)
IMAPAccount.createTable(ifNotExists = True)
self.imap_account = IMAPAccount(user_jid = "user1@test.com", \
name = "account1", \
jid = "account1@jmc.test.com", \
login = "login")
self.imap_account.password = "pass"
self.imap_account.host = "localhost"
self.imap_account.port = 1143
self.imap_account.ssl = False
del account.hub.threadConnection
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
IMAPAccount.dropTable(ifExists = True)
MailAccount.dropTable(ifExists = True)
PresenceAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.server = None
self.imap_account = None
def make_test(responses = None, queries = None, core = None):
def inner(self):
self.server = server.DummyServer("localhost", 1143)
thread.start_new_thread(self.server.serve, ())
self.server.responses = ["* OK [CAPABILITY IMAP4 LOGIN-REFERRALS " + \
"AUTH=PLAIN]\n", \
lambda data: "* CAPABILITY IMAP4 " + \
"LOGIN-REFERRALS AUTH=PLAIN\n" + \
data.split()[0] + \
" OK CAPABILITY completed\n", \
lambda data: data.split()[0] + \
" OK LOGIN completed\n"]
if responses:
self.server.responses += responses
self.server.queries = ["^[^ ]* CAPABILITY", \
"^[^ ]* LOGIN login \"pass\""]
if queries:
self.server.queries += queries
self.server.queries += ["^[^ ]* LOGOUT"]
self.imap_account.connect()
self.failUnless(self.imap_account.connection, \
"Cannot establish connection")
if core:
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
core(self)
del account.hub.threadConnection
self.imap_account.disconnect()
self.failUnless(self.server.verify_queries())
return inner
test_connection = make_test()
test_get_mail_list = make_test([lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" +\
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\n", \
lambda data: "* SEARCH 9 10 \n" + \
data.split()[0] + " OK SEARCH completed\n"], \
["^[^ ]* SELECT INBOX", \
"^[^ ]* SEARCH RECENT"], \
lambda self: \
self.assertEquals(self.imap_account.get_mail_list(), ['9', '10']))
test_get_mail_summary = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n", \
lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"], \
["^[^ ]* EXAMINE INBOX", \
"^[^ ]* FETCH 1 \(RFC822\)"], \
lambda self: self.assertEquals(self.imap_account.get_mail_summary(1), \
(u"From : None\nSubject : None\n\n", \
u"None")))
test_get_mail = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n", \
lambda data: "* 1 FETCH ((RFC822) {11}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n"], \
["^[^ ]* EXAMINE INBOX", \
"^[^ ]* FETCH 1 \(RFC822\)",], \
lambda self: self.assertEquals(self.imap_account.get_mail(1), \
(u"From : None\nSubject : None\n\nbody text\r\n\n", \
u"None")))
def test_get_register_fields(self):
register_fields = IMAPAccount.get_register_fields()
self.assertEquals(len(register_fields), 15)
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(MailAccount_TestCase, 'test'))
suite.addTest(unittest.makeSuite(POP3Account_TestCase, 'test'))
suite.addTest(unittest.makeSuite(IMAPAccount_TestCase, 'test'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View File

@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
## email_generator.py
## Login : David Rousselie <david.rousselie@happycoders.org>
## Started on Tue May 17 15:33:35 2005
## $Id: email_generator.py,v 1.1 2005/07/11 20:39:31 dax Exp $
##
## Copyright (C) 2005
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
from email.Header import Header
from email.MIMEText import MIMEText
from email.MIMEMultipart import MIMEMultipart
def _create_multipart(encoded):
msg = MIMEMultipart()
if encoded:
part1 = MIMEText("utf-8 multipart1 with no charset (éàê)", _charset = "")
msg.attach(part1)
part2 = MIMEText("Encoded multipart2 with 'iso-8859-15' charset (<28><><EFBFBD>)", \
_charset = "iso-8859-15")
msg.attach(part2)
part3 = MIMEText("Encoded multipart3 with no charset (<28><><EFBFBD>)", \
_charset = "")
msg.attach(part3)
else:
part1 = MIMEText("Not encoded multipart1")
msg.attach(part1)
part2 = MIMEText("Not encoded multipart2")
msg.attach(part2)
return msg
def _create_singlepart(encoded):
if encoded:
return MIMEText("Encoded single part with 'iso-8859-15' charset (<28><><EFBFBD>)", \
_charset = "iso-8859-15")
else:
return MIMEText("Not encoded single part")
def generate(encoded, multipart, header):
msg = None
if multipart:
msg = _create_multipart(encoded)
else:
msg = _create_singlepart(encoded)
if header:
if encoded:
msg['Subject'] = Header("encoded subject (<28><><EFBFBD>)", "iso-8859-15")
msg['From'] = Header("encoded from (<28><><EFBFBD>)", "iso-8859-15")
else:
msg['Subject'] = Header("not encoded subject")
msg['From'] = Header("not encoded from")
return msg

View File

@@ -0,0 +1,209 @@
##
## dummy_server.py
## Login : David Rousselie <david.rousselie@happycoders.org>
## Started on Fri May 13 12:53:17 2005
## $Id: dummy_server.py,v 1.1 2005/07/11 20:39:31 dax Exp $
##
## Copyright (C) 2005
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import sys
import time
import traceback
import re
import os
import socket
import types
import select
import xml.dom.minidom
from pyxmpp import xmlextra
def xmldiff(node1, node2):
if node1.nodeType == node1.TEXT_NODE:
if not node2.nodeType == node2.TEXT_NODE \
or re.compile(node2.data + "$").match(node1.data) is None:
raise Exception("data in text node " + node1.data + " does not match " + node2.data)
elif node1.nodeType == node1.DOCUMENT_NODE:
if not node2.nodeType == node2.DOCUMENT_NODE:
raise Exception("node1 is Document but not node2 (" + node2.nodeType + ")")
elif node1.tagName != node2.tagName:
raise Exception("Different tag name : " + node1.tagName + " != " + node2.tagName)
else:
for attr in node1._get_attributes().keys():
if not node2.hasAttribute(attr) \
or node1.getAttribute(attr) != node2.getAttribute(attr):
raise Exception("(" + node1.tagName + ") Different attributes : " + node1.getAttribute(attr) + " != " + node2.getAttribute(attr))
if len(node1.childNodes) != len(node2.childNodes):
raise Exception("(" + node1.tagName + ") Different children number : " + str(len(node1.childNodes)) + " != " + str(len(node2.childNodes)))
for i in range(len(node1.childNodes)):
xmldiff(node1.childNodes[i], node2.childNodes[i])
class DummyServer:
def __init__(self, host, port, responses = None):
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error, msg:
print >>sys.stderr, msg
s = None
raise socket.error
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(sa)
s.listen(1)
except socket.error, msg:
print >>sys.stderr, msg
s.close()
s = None
raise socket.error
break
self.socket = s
self.responses = None
self.queries = None
self.real_queries = []
def serve(self):
conn = None
try:
conn, addr = self.socket.accept()
rfile = conn.makefile('rb', -1)
if self.responses:
data = None
for idx in range(len(self.responses)):
# if response is a function apply it (it must return a string)
# it is given previous received data
if isinstance(self.responses[idx], types.FunctionType):
response = self.responses[idx](data)
else:
response = self.responses[idx]
if response is not None:
#print >>sys.stderr, 'Sending : ', response
conn.send(response)
data = rfile.readline()
if not data:
break
else:
self.real_queries.append(data)
#print >>sys.stderr, 'Receive : ', data
if conn is not None:
conn.close()
if self.socket is not None:
self.socket.close()
self.socket = None
except:
type, value, stack = sys.exc_info()
print >>sys.stderr, "".join(traceback.format_exception
(type, value, stack, 5))
def verify_queries(self):
result = True
queries_len = len(self.queries)
if queries_len == len(self.real_queries):
for idx in range(queries_len):
real_query = self.real_queries[idx]
match = (re.compile(self.queries[idx], re.M).match(real_query) is not None)
if not match:
result = False
print >>sys.stderr, "Unexpected query :\n" + \
"Expected query : _" + self.queries[idx] + "_\n" + \
"Receive query : _" + real_query + "_\n"
else:
result = False
print >>sys.stderr, "Expected " + str(queries_len) + \
" queries, got " + str(len(self.real_queries)) + \
"\t" + str(self.real_queries)
return result
class XMLDummyServer(DummyServer):
def __init__(self, host, port, responses, stream_handler):
DummyServer.__init__(self, host, port, responses)
self._reader = xmlextra.StreamReader(stream_handler)
def serve(self):
try:
conn, addr = self.socket.accept()
if self.responses:
data = None
for idx in range(len(self.responses)):
try:
# TODO : this approximation is not clean
# received size is based on the expected size in self.queries
data = conn.recv(1024 + len(self.queries[idx]))
# print "receive : " + data
if data:
## TODO : without this log, test_set_register in test_component wait forever
#print "-----------RECEIVE1 " + data
r = self._reader.feed(data)
except:
type, value, stack = sys.exc_info()
print "".join (traceback.format_exception
(type, value, stack, 5))
raise
# TODO verify got all data </stream>
if data:
self.real_queries.append(data)
# if response is a function apply it (it must return a string)
# it is given previous received data
if isinstance(self.responses[idx], types.FunctionType):
response = self.responses[idx](data)
else:
response = self.responses[idx]
if response is not None:
# print >>sys.stderr, '---------SENDING : ', response
conn.send(response)
data = conn.recv(1024)
if data:
# print "-----------RECEIVE2 " + data
r = self._reader.feed(data)
self.real_queries.append(data)
conn.close()
self.socket.close()
self.socket = None
except:
type, value, stack = sys.exc_info()
print "".join (traceback.format_exception
(type, value, stack, 5))
raise
def verify_queries(self):
result = True
full_real_queries = ""
full_recv_queries = ""
for idx in range(len(self.real_queries)):
full_real_queries += self.real_queries[idx].rstrip(os.linesep)
for idx in range(len(self.queries)):
full_recv_queries += self.queries[idx].rstrip(os.linesep)
# Do not receive it but add it so that xml parsing can succeed
#full_real_queries += "</stream:stream>"
real_query = xml.dom.minidom.parseString(full_real_queries)
recv_query = xml.dom.minidom.parseString(full_recv_queries)
try:
utils.xmldiff(real_query, recv_query)
except Exception, msg:
result = False
print >>sys.stderr, msg
return result
def test():
server = DummyServer(("localhost", 4242))
server.responses = ["rep1\n", "rep2\n"]
server.serve()
if __name__ == '__main__':
test()

18
src/jmc/tests/__init__.py Normal file
View File

@@ -0,0 +1,18 @@
"""JMC test module"""
__revision__ = ""
import unittest
from jmc.tests import lang
from jmc.jabber import tests as jabber
from jmc.model import tests as model
def suite():
suite = unittest.TestSuite()
suite.addTest(lang.suite())
suite.addTest(jabber.suite())
suite.addTest(model.suite())
return suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')

71
src/jmc/tests/lang.py Normal file
View File

@@ -0,0 +1,71 @@
# -*- coding: UTF-8 -*-
##
## test_lang.py
## Login : David Rousselie <david.rousselie@happycoders.org>
## Started on Fri May 20 10:46:58 2005
## $Id: test_lang.py,v 1.1 2005/07/11 20:39:31 dax Exp $
##
## Copyright (C) 2005
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
# TODO : Reuse JCL Lang tests
import unittest
from jmc.lang import Lang
from pyxmpp.iq import Iq
class Lang_TestCase(unittest.TestCase):
def setUp(self):
self.lang = Lang()
def tearDown(self):
self.lang = None
def test_get_lang_class_exist(self):
lang_class = self.lang.get_lang_class("fr")
self.assertEquals(lang_class, Lang.fr)
def test_get_lang_class_not_exist(self):
lang_class = self.lang.get_lang_class("not_exist")
self.assertEquals(lang_class, Lang.en)
def test_get_lang_class_long_code(self):
lang_class = self.lang.get_lang_class("fr_FR")
self.assertEquals(lang_class, Lang.fr)
def test_get_lang_from_node(self):
iq = Iq(from_jid = "test@test.com", \
to_jid = "test2@test.com", \
stanza_type = "get")
iq_node = iq.get_node()
iq_node.setLang("fr")
lang = self.lang.get_lang_from_node(iq_node)
self.assertEquals(lang, "fr")
def test_get_lang_class_from_node(self):
iq = Iq(from_jid = "test@test.com", \
to_jid = "test2@test.com", \
stanza_type = "get")
iq_node = iq.get_node()
iq_node.setLang("fr")
lang = self.lang.get_lang_class_from_node(iq_node)
self.assertEquals(lang, Lang.fr)
def suite():
return unittest.makeSuite(Lang_TestCase, 'test')
if __name__ == '__main__':
unittest.main(defaultTest='suite')