Some refactoring

darcs-hash:20070708194231-86b55-484cc22c11e199ca7e91f100f7707ada3d44ed74.gz
This commit is contained in:
David Rousselie
2007-07-08 21:42:31 +02:00
parent 620b500e0b
commit bccd50ea89
11 changed files with 397 additions and 310 deletions

View File

@@ -11,8 +11,8 @@ from jmc.model.account import NoAccountError, SMTPAccount
class MailHandler(Handler):
"""Define filter for email address in JID"""
def __init__(self):
Handler.__init__(self)
def __init__(self, component):
Handler.__init__(self, component)
self.dest_jid_regexp = re.compile(".*%.*")
def filter(self, stanza, lang_class):

View File

@@ -30,11 +30,14 @@ from sqlobject import *
from pyxmpp.message import Message
from pyxmpp.jid import JID
import jcl.jabber as jabber
from jcl.model.account import Account, PresenceAccount, LegacyJID
from jcl.jabber.disco import RootDiscoGetInfoHandler
from jcl.jabber.component import Handler, AccountManager
from jcl.jabber.feeder import FeederComponent, Feeder, MessageSender, \
HeadlineSender, FeederHandler
from jmc.jabber.disco import MailRootDiscoGetInfoHandler
from jmc.jabber.message import SendMailMessageHandler, \
RootSendMailMessageHandler
from jmc.jabber.presence import MailSubscribeHandler, \
@@ -51,7 +54,6 @@ class MailComponent(FeederComponent):
secret,
server,
port,
db_connection_str,
lang=Lang()):
"""Use FeederComponent behavior and setup feeder and sender
attributes.
@@ -61,30 +63,20 @@ class MailComponent(FeederComponent):
secret,
server,
port,
db_connection_str,
lang=lang)
self.handler = MailFeederHandler(MailFeeder(self), MailSender(self))
self.account_manager = MailAccountManager(self)
self.account_manager.account_classes = (IMAPAccount,
POP3Account,
SMTPAccount)
self.msg_handlers += [SendMailMessageHandler(),
RootSendMailMessageHandler()]
self.subscribe_handlers += [MailSubscribeHandler()]
self.unsubscribe_handlers += [MailUnsubscribeHandler()]
self.available_handlers += [MailPresenceHandler()]
self.unavailable_handlers += [MailPresenceHandler()]
class MailAccountManager(AccountManager):
"""JMC specific account behavior"""
def root_disco_get_info(self, node, name, category, type):
"""Add jabber:iq:gateway support"""
disco_info = AccountManager.root_disco_get_info(self, node, name,
category, type)
disco_info.add_feature("jabber:iq:gateway")
disco_info.add_identity(name, "headline", "newmail")
return disco_info
self.msg_handlers += [[SendMailMessageHandler(self),
RootSendMailMessageHandler(self)]]
self.presence_subscribe_handlers += [[MailSubscribeHandler(self)]]
self.presence_unsubscribe_handlers += [[MailUnsubscribeHandler(self)]]
self.presence_available_handlers += [[MailPresenceHandler(self)]]
self.presence_unavailable_handlers += [[MailPresenceHandler(self)]]
jabber.replace_handlers(self.disco_get_info_handlers,
RootDiscoGetInfoHandler,
MailRootDiscoGetInfoHandler(self))
class MailFeeder(Feeder):
"""Email check"""

32
src/jmc/jabber/disco.py Normal file
View File

@@ -0,0 +1,32 @@
##
## disco.py
## Login : David Rousselie <dax@happycoders.org>
## Started on Sun Jul 8 20:55:46 2007 David Rousselie
## $Id$
##
## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
from jcl.jabber.disco import RootDiscoGetInfoHandler
class MailRootDiscoGetInfoHandler(RootDiscoGetInfoHandler):
def handle(self, stanza, lang_class, node, disco_obj, data):
"""Add jabber:iq:gateway support"""
disco_infos = RootDiscoGetInfoHandler.handle(self, stanza, lang_class,
node, disco_obj, data)
disco_infos[0].add_feature("jabber:iq:gateway")
disco_infos[0].add_identity(self.component.name, "headline", "newmail")
return disco_infos

View File

@@ -31,8 +31,8 @@ from jmc.jabber import MailHandler
from jmc.model.account import SMTPAccount
class SendMailMessageHandler(MailHandler):
def __init__(self):
MailHandler.__init__(self)
def __init__(self, component):
MailHandler.__init__(self, component)
self.__logger = logging.getLogger(\
"jmc.jabber.component.SendMailMessageHandler")
@@ -57,8 +57,8 @@ class SendMailMessageHandler(MailHandler):
class RootSendMailMessageHandler(SendMailMessageHandler):
"""Handle message sent to root JID"""
def __init__(self):
SendMailMessageHandler.__init__(self)
def __init__(self, component):
SendMailMessageHandler.__init__(self, component)
self.to_regexp = re.compile("^\s*(to|TO|To)\s*:\s*(?P<to_email>.*)")
self.__logger = logging.getLogger(\
"jmc.jabber.component.RootSendMailMessageHandler")

View File

@@ -31,8 +31,9 @@ from jmc.jabber import MailHandler
class MailPresenceHandler(DefaultPresenceHandler):
"""Define filter for legacy JIDs presence handling"""
def __init__(self):
Handler.__init__(self)
def __init__(self, component):
Handler.__init__(self, component)
self.dest_jid_regexp = re.compile(".*%.*")
def filter(self, stanza, lang_class):
@@ -44,13 +45,14 @@ class MailPresenceHandler(DefaultPresenceHandler):
return None
class MailSubscribeHandler(DefaultSubscribeHandler, MailHandler):
"""Use DefaultSubscribeHandler handle method and MailHandler filter.
"""
Use DefaultSubscribeHandler handle method and MailHandler filter.
Filter email address in JID. Accept and add to LegacyJID table.
"""
def __init__(self):
DefaultSubscribeHandler.__init__(self)
MailHandler.__init__(self)
def __init__(self, component):
DefaultSubscribeHandler.__init__(self, component)
MailHandler.__init__(self, component)
def filter(self, stanza, lang_class):
return MailHandler.filter(self, stanza, lang_class)
@@ -65,12 +67,13 @@ class MailSubscribeHandler(DefaultSubscribeHandler, MailHandler):
return result
class MailUnsubscribeHandler(DefaultUnsubscribeHandler, MailHandler):
"""Use DefaultUnsubscribeHandler handle method and MailHandler filter.
"""
Use DefaultUnsubscribeHandler handle method and MailHandler filter.
"""
def __init__(self):
DefaultUnsubscribeHandler.__init__(self)
MailHandler.__init__(self)
def __init__(self, component):
DefaultUnsubscribeHandler.__init__(self, component)
MailHandler.__init__(self, component)
def filter(self, stanza, lang_class):
return MailHandler.filter(self, stanza, lang_class)

View File

@@ -3,11 +3,12 @@ __revision__ = ""
import unittest
from jmc.jabber.tests import component
from jmc.jabber.tests import component, disco
def suite():
suite = unittest.TestSuite()
suite.addTest(component.suite())
suite.addTest(disco.suite())
return suite
if __name__ == '__main__':

View File

@@ -31,9 +31,10 @@ from sqlobject.dbconnection import TheURIOpener
from pyxmpp.presence import Presence
from pyxmpp.message import Message
import jcl.model as model
from jcl.model import account
from jcl.model.account import Account, PresenceAccount, LegacyJID
from jcl.jabber.tests.component import DefaultSubscribeHandler_TestCase, \
from jcl.jabber.tests.presence import DefaultSubscribeHandler_TestCase, \
DefaultUnsubscribeHandler_TestCase
from jcl.jabber.tests.feeder import FeederMock, SenderMock
@@ -45,7 +46,7 @@ from jmc.jabber.message import SendMailMessageHandler, \
from jmc.jabber.presence import MailSubscribeHandler, \
MailUnsubscribeHandler, MailPresenceHandler
from jmc.jabber.component import MailComponent, MailFeederHandler, \
MailAccountManager, MailSender
MailSender
from jmc.lang import Lang
if sys.platform == "win32":
@@ -55,7 +56,7 @@ else:
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
class MockStream(object):
def __init__(self, \
def __init__(self,
jid="",
secret="",
server="",
@@ -73,9 +74,9 @@ class MockStream(object):
def set_iq_set_handler(self, iq_type, ns, handler):
if not iq_type in ["query"]:
raise Exception("IQ type unknown: " + iq_type)
if not ns in ["jabber:iq:version", \
"jabber:iq:register", \
"http://jabber.org/protocol/disco#items", \
if not ns in ["jabber:iq:version",
"jabber:iq:register",
"http://jabber.org/protocol/disco#items",
"http://jabber.org/protocol/disco#info"]:
raise Exception("Unknown namespace: " + ns)
if handler is None:
@@ -84,12 +85,12 @@ class MockStream(object):
set_iq_get_handler = set_iq_set_handler
def set_presence_handler(self, status, handler):
if not status in ["available", \
"unavailable", \
"probe", \
"subscribe", \
"subscribed", \
"unsubscribe", \
if not status in ["available",
"unavailable",
"probe",
"subscribe",
"subscribed",
"unsubscribe",
"unsubscribed"]:
raise Exception("Status unknown: " + status)
if handler is None:
@@ -162,11 +163,11 @@ class MailComponent_TestCase(unittest.TestCase):
self.comp = MailComponent("jmc.test.com",
"password",
"localhost",
"5347",
'sqlite://' + DB_URL)
"5347")
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists=True)
PresenceAccount.createTable(ifNotExists=True)
MailAccount.createTable(ifNotExists=True)
@@ -175,10 +176,10 @@ class MailComponent_TestCase(unittest.TestCase):
SMTPAccount.createTable(ifNotExists=True)
MockIMAPAccount.createTable(ifNotExists=True)
MockPOP3Account.createTable(ifNotExists=True)
del account.hub.threadConnection
model.db_disconnect()
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
MockPOP3Account.dropTable(ifExists=True)
MockIMAPAccount.dropTable(ifExists=True)
SMTPAccount.dropTable(ifExists=True)
@@ -188,8 +189,8 @@ class MailComponent_TestCase(unittest.TestCase):
PresenceAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
@@ -197,9 +198,9 @@ class MailComponent_TestCase(unittest.TestCase):
# 'feed' test methods
###########################################################################
def test_feed_live_email_init_no_password(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
model.db_connect()
account11 = MockIMAPAccount(user_jid="test1@test.com",
name="account11",
jid="account11@jmc.test.com")
account11.status = account.ONLINE
self.assertTrue(account11.first_check)
@@ -219,10 +220,10 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
del account.hub.threadConnection
model.db_disconnect()
def test_feed_live_email_init_no_password2(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -241,10 +242,10 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
model.db_disconnect()
def test_feed_interval_no_check(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -255,10 +256,10 @@ class MailComponent_TestCase(unittest.TestCase):
result = self.comp.handler.feeder.feed(account11)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 1)
del account.hub.threadConnection
model.db_disconnect()
def test_feed_interval_check(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -269,10 +270,10 @@ class MailComponent_TestCase(unittest.TestCase):
result = self.comp.handler.feeder.feed(account11)
self.assertEquals(result, [])
self.assertEquals(account11.lastcheck, 0)
del account.hub.threadConnection
model.db_disconnect()
def test_feed_no_password(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -293,10 +294,10 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
del account.hub.threadConnection
model.db_disconnect()
def test_feed_unknown_action(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -317,10 +318,10 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertEquals(account11.lastcheck, 0)
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
del account.hub.threadConnection
model.db_disconnect()
def test_feed_retrieve_no_mail(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -338,13 +339,13 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
model.db_disconnect()
def test_feed_retrieve_mail(self):
def mock_get_mail(index):
return [("body1", "from1@test.com"),
("body2", "from2@test.com")][index]
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid="test1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -375,10 +376,10 @@ class MailComponent_TestCase(unittest.TestCase):
account11.default_lang_class.new_mail_subject \
% ("from2@test.com"))
self.assertEquals(result[1][2], "body2")
del account.hub.threadConnection
model.db_disconnect()
def test_feed_digest_no_mail(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -396,13 +397,13 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertEquals(len(self.comp.stream.sent), 0)
del account.hub.threadConnection
model.db_disconnect()
def test_feed_digest_mail(self):
def mock_get_mail_summary(index):
return [("body1", "from1@test.com"), \
("body2", "from2@test.com")][index]
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -426,13 +427,13 @@ class MailComponent_TestCase(unittest.TestCase):
% (2))
self.assertEquals(result[0][2], \
"body1\n----------------------------------\nbody2\n----------------------------------\n")
del account.hub.threadConnection
model.db_disconnect()
###########################################################################
# 'initialize_live_email' test methods
###########################################################################
def test_initialize_live_email(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -449,12 +450,12 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertTrue(account11.marked_all_as_read)
del account.hub.threadConnection
model.db_disconnect()
def test_initialize_live_email_connection_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -476,12 +477,12 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertFalse(account11.connected)
self.assertFalse(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
del account.hub.threadConnection
model.db_disconnect()
def test_initialize_live_email_mark_as_read_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -503,12 +504,12 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertFalse(account11.marked_all_as_read)
del account.hub.threadConnection
model.db_disconnect()
def test_initialize_live_email_disconnection_error(self):
def raiser():
raise Exception
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = MockIMAPAccount(user_jid = "test1@test.com", \
name = "account11", \
jid = "account11@jmc.test.com")
@@ -531,11 +532,11 @@ class MailComponent_TestCase(unittest.TestCase):
self.assertFalse(account11.connected)
self.assertTrue(account11.has_connected)
self.assertTrue(account11.marked_all_as_read)
del account.hub.threadConnection
model.db_disconnect()
class SendMailMessageHandler_TestCase(unittest.TestCase):
def setUp(self):
self.handler = SendMailMessageHandler()
self.handler = SendMailMessageHandler(None)
def test_handle(self):
message = Message(from_jid="user1@test.com",
@@ -556,37 +557,29 @@ class SendMailMessageHandler_TestCase(unittest.TestCase):
self.assertEquals(result[0].get_body(),
Lang.en.send_mail_ok_body % ("user@test.com"))
class MailAccountManager_TestCase(unittest.TestCase):
def test_root_disco_get_info(self):
mam = MailAccountManager(None)
disco_info = mam.root_disco_get_info(None, "JMC", "gateway", "smtp")
self.assertTrue(disco_info.has_feature("jabber:iq:gateway"))
self.assertEquals(len(disco_info.get_identities()), 2)
self.assertTrue(disco_info.identity_is("gateway", "smtp"))
self.assertTrue(disco_info.identity_is("headline", "newmail"))
class RootSendMailMessageHandler_TestCase(unittest.TestCase):
def setUp(self):
self.handler = RootSendMailMessageHandler()
self.handler = RootSendMailMessageHandler(None)
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists=True)
SMTPAccount.createTable(ifNotExists=True)
del account.hub.threadConnection
model.db_disconnect()
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
SMTPAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_filter(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -599,10 +592,10 @@ class RootSendMailMessageHandler_TestCase(unittest.TestCase):
body="message")
accounts = self.handler.filter(message, None)
self.assertEquals(accounts.count(), 1)
del account.hub.threadConnection
model.db_disconnect()
def test_filter_no_default_account(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -615,10 +608,10 @@ class RootSendMailMessageHandler_TestCase(unittest.TestCase):
accounts = self.handler.filter(message, None)
self.assertEquals(accounts.count(), 2)
self.assertEquals(accounts[0].name, "account11")
del account.hub.threadConnection
model.db_disconnect()
def test_filter_wrong_dest(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -630,10 +623,10 @@ class RootSendMailMessageHandler_TestCase(unittest.TestCase):
body="message")
accounts = self.handler.filter(message, None)
self.assertEquals(accounts.count(), 2)
del account.hub.threadConnection
model.db_disconnect()
def test_filter_wrong_user(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -645,7 +638,7 @@ class RootSendMailMessageHandler_TestCase(unittest.TestCase):
body="message")
accounts = self.handler.filter(message, None)
self.assertEquals(accounts.count(), 0)
del account.hub.threadConnection
model.db_disconnect()
def test_handle_email_found_in_header(self):
message = Message(from_jid="user1@test.com",
@@ -688,30 +681,31 @@ class MailSender_TestCase(unittest.TestCase):
def setUp(self):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists=True)
PresenceAccount.createTable(ifNotExists=True)
MailAccount.createTable(ifNotExists=True)
IMAPAccount.createTable(ifNotExists=True)
POP3Account.createTable(ifNotExists=True)
del account.hub.threadConnection
model.db_disconnect()
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
POP3Account.dropTable(ifExists=True)
IMAPAccount.dropTable(ifExists=True)
MailAccount.dropTable(ifExists=True)
PresenceAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_create_message(self):
mail_sender = MailSender()
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = IMAPAccount(user_jid="test1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -721,7 +715,7 @@ class MailSender_TestCase(unittest.TestCase):
"subject",
"message body"))
self.assertEquals(message.get_to(), account11.user_jid)
del account.hub.threadConnection
model.db_disconnect()
self.assertEquals(message.get_subject(), "subject")
self.assertEquals(message.get_body(), "message body")
addresses = message.xpath_eval("add:addresses/add:address",
@@ -734,7 +728,7 @@ class MailSender_TestCase(unittest.TestCase):
def test_create_message_digest(self):
mail_sender = MailSender()
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = IMAPAccount(user_jid="test1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -744,33 +738,34 @@ class MailSender_TestCase(unittest.TestCase):
"subject",
"message body"))
self.assertEquals(message.get_to(), account11.user_jid)
del account.hub.threadConnection
model.db_disconnect()
self.assertEquals(message.get_subject(), "subject")
self.assertEquals(message.get_body(), "message body")
self.assertEquals(message.get_type(), "headline")
class MailHandler_TestCase(unittest.TestCase):
def setUp(self):
self.handler = MailHandler()
self.handler = MailHandler(None)
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists=True)
SMTPAccount.createTable(ifNotExists=True)
del account.hub.threadConnection
model.db_disconnect()
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
SMTPAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_filter(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -785,10 +780,10 @@ class MailHandler_TestCase(unittest.TestCase):
self.assertNotEquals(accounts, None)
self.assertEquals(accounts.count(), 1)
self.assertEquals(accounts[0].name, "account11")
del account.hub.threadConnection
model.db_disconnect()
def test_filter_root(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -801,10 +796,10 @@ class MailHandler_TestCase(unittest.TestCase):
body="message")
accounts = self.handler.filter(message, None)
self.assertEquals(accounts, None)
del account.hub.threadConnection
model.db_disconnect()
def test_filter_no_default(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid = "user1@test.com", \
name = "account11", \
jid = "account11@jcl.test.com")
@@ -818,10 +813,10 @@ class MailHandler_TestCase(unittest.TestCase):
self.assertNotEquals(accounts, None)
self.assertEquals(accounts.count(), 2)
self.assertEquals(accounts[0].name, "account11")
del account.hub.threadConnection
model.db_disconnect()
def test_filter_wrong_dest(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid = "user1@test.com", \
name = "account11", \
jid = "account11@jcl.test.com")
@@ -833,10 +828,10 @@ class MailHandler_TestCase(unittest.TestCase):
body = "message")
accounts = self.handler.filter(message, None)
self.assertEquals(accounts, None)
del account.hub.threadConnection
model.db_disconnect()
def test_filter_wrong_account(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -852,12 +847,12 @@ class MailHandler_TestCase(unittest.TestCase):
self.assertNotEquals(e, None)
return
finally:
del account.hub.threadConnection
model.db_disconnect()
self.fail("No exception 'NoAccountError' catched")
class MailPresenceHandler_TestCase(unittest.TestCase):
def setUp(self):
self.handler = MailPresenceHandler()
self.handler = MailPresenceHandler(None)
def test_filter(self):
message = Message(from_jid="user1@test.com",
@@ -883,13 +878,13 @@ class MailPresenceHandler_TestCase(unittest.TestCase):
class MailSubscribeHandler_TestCase(DefaultSubscribeHandler_TestCase, MailHandler_TestCase):
def setUp(self):
MailHandler_TestCase.setUp(self)
self.handler = MailSubscribeHandler()
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
self.handler = MailSubscribeHandler(None)
model.db_connect()
LegacyJID.createTable(ifNotExists=True)
del account.hub.threadConnection
model.db_disconnect()
def test_handle(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -899,18 +894,18 @@ class MailSubscribeHandler_TestCase(DefaultSubscribeHandler_TestCase, MailHandle
result = self.handler.handle(presence, Lang.en, [account11])
legacy_jids = LegacyJID.select()
self.assertEquals(legacy_jids.count(), 1)
del account.hub.threadConnection
model.db_disconnect()
class MailUnsubscribeHandler_TestCase(DefaultUnsubscribeHandler_TestCase, MailHandler_TestCase):
def setUp(self):
MailHandler_TestCase.setUp(self)
self.handler = MailUnsubscribeHandler()
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
self.handler = MailUnsubscribeHandler(None)
model.db_connect()
LegacyJID.createTable(ifNotExists=True)
del account.hub.threadConnection
model.db_disconnect()
def test_handle(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -944,25 +939,26 @@ class MailUnsubscribeHandler_TestCase(DefaultUnsubscribeHandler_TestCase, MailHa
removed_legacy_jid = LegacyJID.select(\
LegacyJID.q.jid == "u111%test.com@jcl.test.com")
self.assertEquals(removed_legacy_jid.count(), 0)
del account.hub.threadConnection
model.db_disconnect()
class MailFeederHandler_TestCase(unittest.TestCase):
def setUp(self):
self.handler = MailFeederHandler(FeederMock(), SenderMock())
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists=True)
PresenceAccount.createTable(ifNotExists=True)
MailAccount.createTable(ifNotExists=True)
IMAPAccount.createTable(ifNotExists=True)
POP3Account.createTable(ifNotExists=True)
SMTPAccount.createTable(ifNotExists=True)
del account.hub.threadConnection
model.db_disconnect()
def tearDown(self):
self.handler = None
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
SMTPAccount.dropTable(ifExists=True)
IMAPAccount.dropTable(ifExists=True)
POP3Account.dropTable(ifExists=True)
@@ -970,13 +966,13 @@ class MailFeederHandler_TestCase(unittest.TestCase):
PresenceAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_filter(self):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jcl.test.com")
@@ -991,13 +987,12 @@ class MailFeederHandler_TestCase(unittest.TestCase):
self.assertEquals(accounts.count(), 2)
self.assertEquals(accounts[0].name, "account12")
self.assertEquals(accounts[1].name, "account13")
del account.hub.threadConnection
model.db_disconnect()
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(MailComponent_TestCase, 'test'))
suite.addTest(unittest.makeSuite(SendMailMessageHandler_TestCase, 'test'))
suite.addTest(unittest.makeSuite(MailAccountManager_TestCase, 'test'))
suite.addTest(unittest.makeSuite(RootSendMailMessageHandler_TestCase, 'test'))
suite.addTest(unittest.makeSuite(MailSender_TestCase, 'test'))
suite.addTest(unittest.makeSuite(MailHandler_TestCase, 'test'))

View File

@@ -0,0 +1,63 @@
##
## disco.py
## Login : David Rousselie <dax@happycoders.org>
## Started on Sun Jul 8 20:59:32 2007 David Rousselie
## $Id$
##
## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import unittest
from jmc.jabber.disco import MailRootDiscoGetInfoHandler
class MockDiscoIndentity(object):
def __init__(self):
self.category = ""
self.type = ""
class MockAccountManager(object):
def __init__(self):
self.has_multiple_account_type = False
class MockComponent(object):
def __init__(self):
self.name = ""
self.disco_identity = MockDiscoIndentity()
self.account_manager = MockAccountManager()
class MailRootDiscoGetInfoHandler_TestCase(unittest.TestCase):
def test_root_disco_get_info(self):
component = MockComponent()
component.name = "Mock component"
component.disco_identity.category = "gateway"
component.disco_identity.type = "smtp"
component.account_manager.has_multiple_account_type = True
handler = MailRootDiscoGetInfoHandler(component)
# stanza, lang_class, node, disco_obj, data
disco_infos = handler.handle(None, None, None, None, None)
self.assertTrue(disco_infos[0].has_feature("jabber:iq:gateway"))
self.assertEquals(len(disco_infos[0].get_identities()), 2)
self.assertTrue(disco_infos[0].identity_is("gateway", "smtp"))
self.assertTrue(disco_infos[0].identity_is("headline", "newmail"))
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(MailRootDiscoGetInfoHandler_TestCase, 'test'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View File

@@ -29,6 +29,7 @@ import sys
from sqlobject import *
from sqlobject.dbconnection import TheURIOpener
import jcl.model as model
from jcl.model import account
from jcl.model.account import Account, PresenceAccount
from jmc.model.account import MailAccount, POP3Account, IMAPAccount, SMTPAccount
@@ -49,32 +50,33 @@ class MailAccount_TestCase(PresenceAccount_TestCase):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.db_url = DB_URL
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connection_str = 'sqlite://' + self.db_url
model.db_connect()
Account.createTable(ifNotExists=True)
PresenceAccount.createTable(ifNotExists=True)
MailAccount.createTable(ifNotExists=True)
self.account = MailAccount(user_jid = "user1@test.com", \
name = "account1", \
self.account = MailAccount(user_jid="user1@test.com",
name="account1",
jid="account1@jmc.test.com")
del account.hub.threadConnection
model.db_disconnect()
self.account_class = MailAccount
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
MailAccount.dropTable(ifExists=True)
PresenceAccount.dropTable(ifExists=True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + self.db_url]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def make_test(email_type, tested_func, expected_res):
def inner(self):
encoded, multipart, header = email_type
email = email_generator.generate(encoded, \
multipart, \
email = email_generator.generate(encoded,
multipart,
header)
part = tested_func(self, email)
self.assertEquals(part, expected_res)
@@ -153,31 +155,32 @@ class POP3Account_TestCase(InheritableAccount_TestCase):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.db_url = DB_URL
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connection_str = 'sqlite://' + self.db_url
model.db_connect()
Account.createTable(ifNotExists=True)
PresenceAccount.createTable(ifNotExists=True)
MailAccount.createTable(ifNotExists=True)
POP3Account.createTable(ifNotExists=True)
self.pop3_account = POP3Account(user_jid = "user1@test.com", \
name = "account1", \
jid = "account1@jmc.test.com", \
self.pop3_account = POP3Account(user_jid="user1@test.com",
name="account1",
jid="account1@jmc.test.com",
login="login")
self.pop3_account.password = "pass"
self.pop3_account.host = "localhost"
self.pop3_account.port = 1110
self.pop3_account.ssl = False
del account.hub.threadConnection
model.db_disconnect()
self.account_class = POP3Account
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
POP3Account.dropTable(ifExists=True)
MailAccount.dropTable(ifExists=True)
PresenceAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + self.db_url]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.server = None
@@ -187,57 +190,57 @@ class POP3Account_TestCase(InheritableAccount_TestCase):
def inner(self):
self.server = server.DummyServer("localhost", 1110)
thread.start_new_thread(self.server.serve, ())
self.server.responses = ["+OK connected\r\n", \
"+OK name is a valid mailbox\r\n", \
self.server.responses = ["+OK connected\r\n",
"+OK name is a valid mailbox\r\n",
"+OK pass\r\n"]
if responses:
self.server.responses += responses
self.server.queries = ["USER login\r\n", \
self.server.queries = ["USER login\r\n",
"PASS pass\r\n"]
if queries:
self.server.queries += queries
self.server.queries += ["QUIT\r\n"]
self.pop3_account.connect()
self.failUnless(self.pop3_account.connection, \
self.failUnless(self.pop3_account.connection,
"Cannot establish connection")
if core:
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
core(self)
del account.hub.threadConnection
model.db_disconnect()
self.pop3_account.disconnect()
self.failUnless(self.server.verify_queries(), \
self.failUnless(self.server.verify_queries(),
"Sended queries does not match expected queries.")
return inner
test_connection = make_test()
test_get_mail_list = \
make_test(["+OK 2 20\r\n"], \
["STAT\r\n"], \
make_test(["+OK 2 20\r\n"],
["STAT\r\n"],
lambda self: \
self.assertEquals(self.pop3_account.get_mail_list(), \
self.assertEquals(self.pop3_account.get_mail_list(),
["1", "2"]))
test_get_mail_summary = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
make_test(["+OK 10 octets\r\n" +
"From: user@test.com\r\n" +
"Subject: subject test\r\n\r\n" +
"mymessage\r\n.\r\n",
"+OK\r\n"], \
"+OK\r\n"],
["RETR 1\r\n",
"RSET\r\n"], \
"RSET\r\n"],
lambda self: \
self.assertEquals(self.pop3_account.get_mail_summary(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n", \
self.assertEquals(self.pop3_account.get_mail_summary(1),
(u"From : user@test.com\n" +
u"Subject : subject test\n\n",
u"user@test.com")))
test_get_mail = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
make_test(["+OK 10 octets\r\n" +
"From: user@test.com\r\n" +
"Subject: subject test\r\n\r\n" +
"mymessage\r\n.\r\n",
"+OK\r\n"], \
"+OK\r\n"],
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
@@ -281,31 +284,32 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.db_url = DB_URL
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connection_str = 'sqlite://' + self.db_url
model.db_connect()
Account.createTable(ifNotExists=True)
PresenceAccount.createTable(ifNotExists=True)
MailAccount.createTable(ifNotExists=True)
IMAPAccount.createTable(ifNotExists=True)
self.imap_account = IMAPAccount(user_jid = "user1@test.com", \
name = "account1", \
jid = "account1@jmc.test.com", \
self.imap_account = IMAPAccount(user_jid="user1@test.com",
name="account1",
jid="account1@jmc.test.com",
login="login")
self.imap_account.password = "pass"
self.imap_account.host = "localhost"
self.imap_account.port = 1143
self.imap_account.ssl = False
del account.hub.threadConnection
model.db_disconnect()
self.account_class = IMAPAccount
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
IMAPAccount.dropTable(ifExists=True)
MailAccount.dropTable(ifExists=True)
PresenceAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + self.db_url]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.server = None
@@ -334,9 +338,9 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
self.failUnless(self.imap_account.connection, \
"Cannot establish connection")
if core:
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
core(self)
del account.hub.threadConnection
model.db_disconnect()
self.imap_account.disconnect()
self.failUnless(self.server.verify_queries())
return inner
@@ -388,26 +392,27 @@ class SMTPAccount_TestCase(Account_TestCase):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
self.db_url = DB_URL
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connection_str = 'sqlite://' + self.db_url
model.db_connect()
Account.createTable(ifNotExists=True)
ExampleAccount.createTable(ifNotExists=True)
SMTPAccount.createTable(ifNotExists=True)
del account.hub.threadConnection
model.db_disconnect()
self.account_class = SMTPAccount
def tearDown(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
SMTPAccount.dropTable(ifExists=True)
ExampleAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + self.db_url]
account.hub.threadConnection.close()
del account.hub.threadConnection
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_default_account_post_func_no_default_true(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -418,10 +423,10 @@ class SMTPAccount_TestCase(Account_TestCase):
SMTPAccount.get_register_fields()[7]
value = post_func("True", None, "user1@test.com")
self.assertTrue(value)
del account.hub.threadConnection
model.db_disconnect()
def test_default_account_post_func_no_default_false(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -432,10 +437,10 @@ class SMTPAccount_TestCase(Account_TestCase):
SMTPAccount.get_register_fields()[7]
value = post_func("False", None, "user1@test.com")
self.assertTrue(value)
del account.hub.threadConnection
model.db_disconnect()
def test_default_account_post_func_true(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -448,10 +453,10 @@ class SMTPAccount_TestCase(Account_TestCase):
value = post_func("True", None, "user1@test.com")
self.assertTrue(value)
self.assertFalse(account12.default_account)
del account.hub.threadConnection
model.db_disconnect()
def test_default_account_post_func_false(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -464,14 +469,14 @@ class SMTPAccount_TestCase(Account_TestCase):
value = post_func("False", None, "user1@test.com")
self.assertFalse(value)
self.assertTrue(account12.default_account)
del account.hub.threadConnection
model.db_disconnect()
def test_create_email(self):
account.hub.threadConnection = connectionForURI('sqlite://' + self.db_url)
model.db_connect()
account11 = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
del account.hub.threadConnection
model.db_disconnect()
email = account11.create_email("from@test.com",
"to@test.com",
"subject",
@@ -494,22 +499,20 @@ class SMTPAccount_TestCase(Account_TestCase):
self.server.queries += queries
self.server.queries += ["quit\r\n"]
if core:
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
model.db_connect()
core(self)
del account.hub.threadConnection
model.db_disconnect()
self.failUnless(self.server.verify_queries())
return inner
def test_send_email_esmtp_no_auth(self):
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
model.db_connect()
smtp_account = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
smtp_account.host = "localhost"
smtp_account.port = 1025
del account.hub.threadConnection
model.db_disconnect()
email = smtp_account.create_email("from@test.com",
"to@test.com",
"subject",
@@ -535,14 +538,13 @@ class SMTPAccount_TestCase(Account_TestCase):
test_func()
def test_send_email_no_auth(self):
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
model.db_connect()
smtp_account = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
smtp_account.host = "localhost"
smtp_account.port = 1025
del account.hub.threadConnection
model.db_disconnect()
email = smtp_account.create_email("from@test.com",
"to@test.com",
"subject",
@@ -570,8 +572,7 @@ class SMTPAccount_TestCase(Account_TestCase):
test_func()
def test_send_email_esmtp_auth(self):
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
model.db_connect()
smtp_account = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -579,7 +580,7 @@ class SMTPAccount_TestCase(Account_TestCase):
smtp_account.port = 1025
smtp_account.login = "user"
smtp_account.password = "pass"
del account.hub.threadConnection
model.db_disconnect()
email = smtp_account.create_email("from@test.com",
"to@test.com",
"subject",
@@ -610,8 +611,7 @@ class SMTPAccount_TestCase(Account_TestCase):
test_func()
def test_send_email_esmtp_auth_method2(self):
account.hub.threadConnection = connectionForURI('sqlite://'
+ self.db_url)
model.db_connect()
smtp_account = SMTPAccount(user_jid="user1@test.com",
name="account11",
jid="account11@jmc.test.com")
@@ -619,7 +619,7 @@ class SMTPAccount_TestCase(Account_TestCase):
smtp_account.port = 1025
smtp_account.login = "user"
smtp_account.password = "pass"
del account.hub.threadConnection
model.db_disconnect()
email = smtp_account.create_email("from@test.com",
"to@test.com",
"subject",

View File

@@ -33,11 +33,11 @@ class JMCRunner(JCLRunner):
# define new options
self.check_interval = 1
self.mail_default_encoding = "iso-8859-1"
self.options += [("i:", "check-interval=", "jmc", \
" INTERVAL\t\t\tInterval unit in minute between mail checks", \
lambda arg: setattr(self, "check_interval", int(arg))), \
("e:", "mail-default-encoding=", "jmc", \
" ENCODING\t\tDefault encoding of the component", \
self.options += [("i:", "check-interval=", "jmc",
" INTERVAL\t\t\tInterval unit in minute between mail checks",
lambda arg: setattr(self, "check_interval", int(arg))),
("e:", "mail-default-encoding=", "jmc",
" ENCODING\t\tDefault encoding of the component",
lambda arg: setattr(self, "mail_default_encoding", arg))]
# override JCL default
self.service_jid = "jmc.localhost"
@@ -54,11 +54,10 @@ class JMCRunner(JCLRunner):
def run(self):
def run_func():
component = MailComponent(jid = self.service_jid, \
secret = self.secret, \
server = self.server, \
port = self.port, \
db_connection_str = self.db_url, \
component = MailComponent(jid=self.service_jid,
secret=self.secret,
server=self.server,
port=self.port,
lang=Lang(self.language))
MailAccount.default_encoding = self.mail_default_encoding
component.check_interval = self.check_interval

View File

@@ -28,6 +28,7 @@ from sqlobject import *
from jcl.tests.runner import JCLRunner_TestCase
import jcl.model as model
from jcl.model import account
from jcl.model.account import Account, PresenceAccount
@@ -129,7 +130,8 @@ class JMCRunner_TestCase(JCLRunner_TestCase):
def do_nothing():
pass
self.runner._run(do_nothing)
account.hub.threadConnection = connectionForURI(self.runner.db_url)
model.db_connection_str = self.runner.db_url
model.db_connect()
# dropTable should succeed because tables should exist
Account.dropTable()
PresenceAccount.dropTable()
@@ -137,7 +139,7 @@ class JMCRunner_TestCase(JCLRunner_TestCase):
IMAPAccount.dropTable()
POP3Account.dropTable()
SMTPAccount.dropTable()
del account.hub.threadConnection
model.db_disconnect()
os.unlink(DB_PATH)
self.assertFalse(os.access("/tmp/jmc.pid", os.F_OK))