diff --git a/run_tests.py b/run_tests.py index fe7ff9a..577d3ba 100644 --- a/run_tests.py +++ b/run_tests.py @@ -44,8 +44,13 @@ if __name__ == '__main__': logger = logging.getLogger() logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.CRITICAL) - - unittest.main(defaultTest='suite') + try: + print "Trying to launch test with testoob" + import testoob + testoob.main(defaultTest='suite') + except ImportError: + print "Falling back to standard unittest" + unittest.main(defaultTest='suite') coverage.stop() coverage.analysis(jcl.jabber) diff --git a/src/jcl/jabber/command.py b/src/jcl/jabber/command.py index 70ab598..ff683e9 100644 --- a/src/jcl/jabber/command.py +++ b/src/jcl/jabber/command.py @@ -334,9 +334,11 @@ class JCLCommandManager(CommandManager): return (self.add_form_select_user_jids(command_node, lang_class), []) def select_user_jid_step_1(self, info_query, session_context, - command_node, lang_class): + command_node, lang_class, + actions=[ACTION_NEXT], + default_action=0): self.__logger.debug("Executing select_user_jid step 1") - self.add_actions(command_node, [ACTION_NEXT]) + self.add_actions(command_node, actions, default_action) return (self.add_form_select_user_jid(command_node, lang_class), []) def select_accounts_step_2(self, info_query, session_context, @@ -520,8 +522,36 @@ class JCLCommandManager(CommandManager): command_node.setProp("status", STATUS_COMPLETED) return (None, []) - def execute_get_user_roster(self, info_query): - return [] + def execute_get_user_roster_1(self, info_query, session_context, + command_node, lang_class): + (result_form, result) = self.select_user_jid_step_1(info_query, + session_context, + command_node, + lang_class, + actions=[ACTION_COMPLETE]) + return (result_form, result) + + def execute_get_user_roster_2(self, info_query, session_context, + command_node, lang_class): + self.__logger.debug("Executing command 'get-user-roster' step 2") + result_form = Form(xmlnode_or_type="result") + result_form.add_field(field_type="hidden", + name="FORM_TYPE", + value="http://jabber.org/protocol/admin") + user_jid = session_context["user_jid"] + result_form.fields.append(FieldNoType(name="user_jid", + value=user_jid)) + result_form.as_xml(command_node) + x_form = command_node.children + query = x_form.newChild(None, "query", None) + roster_ns = query.newNs("jabber:iq:roster", None) + query.setNs(roster_ns) + for legacy_jid in account.get_legacy_jids(user_jid): + item = query.newChild(None, "item", None) + item.setProp("jid", legacy_jid.jid) + item.setProp("name", legacy_jid.legacy_address) + command_node.setProp("status", STATUS_COMPLETED) + return (result_form, []) def execute_get_user_lastlogin(self, info_query): return [] diff --git a/src/jcl/jabber/presence.py b/src/jcl/jabber/presence.py index ee5f936..090d62c 100644 --- a/src/jcl/jabber/presence.py +++ b/src/jcl/jabber/presence.py @@ -25,7 +25,6 @@ from pyxmpp.presence import Presence from jcl.jabber import Handler import jcl.jabber as jabber -import jcl.model as model class DefaultPresenceHandler(Handler): """Handle presence""" @@ -69,6 +68,9 @@ class DefaultUnsubscribeHandler(Handler): class AccountPresenceHandler(Handler): filter = jabber.get_account_filter + def get_account_presence(self, stanza, lang_class, _account): + raise NotImplemented + def handle(self, stanza, lang_class, data): """Handle presence sent to an account JID""" result = [] @@ -82,9 +84,12 @@ class AccountPresenceAvailableHandler(AccountPresenceHandler): stanza.get_show(), lang_class) -class RootPresenceHandler(Handler): +class RootPresenceHandler(AccountPresenceHandler): filter = jabber.get_accounts_root_filter + def get_root_presence(self, stanza, lang_class, nb_accounts): + raise NotImplemented + def handle(self, stanza, lang_class, data): """handle presence sent to component JID""" result = [] diff --git a/src/jcl/jabber/tests/command.py b/src/jcl/jabber/tests/command.py index 03996cd..898b626 100644 --- a/src/jcl/jabber/tests/command.py +++ b/src/jcl/jabber/tests/command.py @@ -21,30 +21,21 @@ ## import unittest -import sys -import os from pyxmpp.presence import Presence from pyxmpp.jabber.dataforms import Form from pyxmpp.iq import Iq from pyxmpp.message import Message -from sqlobject.dbconnection import TheURIOpener - from jcl.lang import Lang from jcl.jabber.component import JCLComponent import jcl.jabber.command as command from jcl.jabber.command import FieldNoType, JCLCommandManager import jcl.model as model import jcl.model.account as account -from jcl.model.account import Account +from jcl.model.account import Account, LegacyJID from jcl.model.tests.account import ExampleAccount, Example2Account - -if sys.platform == "win32": - DB_PATH = "/c|/temp/jcl_test.db" -else: - DB_PATH = "/tmp/jcl_test.db" -DB_URL = DB_PATH# + "?debug=1&debugThreading=1" +from jcl.tests import JCLTestCase class FieldNoType_TestCase(unittest.TestCase): def test_complete_xml_element(self): @@ -64,32 +55,17 @@ class CommandManager_TestCase(unittest.TestCase): command_name = command.command_manager.get_short_command_name("test-command") self.assertEquals(command_name, "test_command") -class JCLCommandManager_TestCase(unittest.TestCase): +class JCLCommandManager_TestCase(JCLTestCase): def setUp(self): + JCLTestCase.setUp(self, tables=[Account, ExampleAccount, + Example2Account, LegacyJID]) self.comp = JCLComponent("jcl.test.com", "password", "localhost", "5347") - model.db_connection_str = 'sqlite://' + DB_URL - model.db_connect() - Account.createTable(ifNotExists=True) - ExampleAccount.createTable(ifNotExists=True) - Example2Account.createTable(ifNotExists = True) - model.db_disconnect() self.command_manager = JCLCommandManager(self.comp, self.comp.account_manager) - def tearDown(self): - model.db_connect() - Example2Account.dropTable(ifExists=True) - ExampleAccount.dropTable(ifExists=True) - Account.dropTable(ifExists=True) - del TheURIOpener.cachedURIs['sqlite://' + DB_URL] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - def __check_actions(self, info_query, expected_actions=None, action_index=0): actions = info_query.xpath_eval("c:command/c:actions", {"c": "http://jabber.org/protocol/commands"}) @@ -1255,14 +1231,104 @@ class JCLCommandManager_TestCase(unittest.TestCase): "pass2") self.assertEquals(account11.password, "pass2") -# def test_execute_get_user_roster(self): -# #TODO : implement command -# info_query = Iq(stanza_type="set", -# from_jid="user1@test.com", -# to_jid="jcl.test.com") -# result = self.command_manager.execute_add_user(info_query) -# self.assertNotEquals(result, None) -# self.assertEquals(len(result), 1) + def test_execute_get_user_roster(self): + self.comp.account_manager.account_classes = (ExampleAccount, + Example2Account) + model.db_connect() + account11 = ExampleAccount(user_jid="test1@test.com", + name="account11", + jid="account11@jcl.test.com") + ljid111 = LegacyJID(legacy_address="test111@test.com", + jid="test111%test.com@test.com", + account=account11) + ljid112 = LegacyJID(legacy_address="test112@test.com", + jid="test112%test.com@test.com", + account=account11) + account12 = Example2Account(user_jid="test1@test.com", + name="account12", + jid="account12@jcl.test.com") + ljid121 = LegacyJID(legacy_address="test121@test.com", + jid="test121%test.com@test.com", + account=account12) + account21 = ExampleAccount(user_jid="test2@test.com", + name="account21", + jid="account21@jcl.test.com") + ljid211 = LegacyJID(legacy_address="test211@test.com", + jid="test211%test.com@test.com", + account=account21) + ljid212 = LegacyJID(legacy_address="test212@test.com", + jid="test212%test.com@test.com", + account=account21) + account22 = ExampleAccount(user_jid="test2@test.com", + name="account11", + jid="account11@jcl.test.com") + ljid221 = LegacyJID(legacy_address="test221@test.com", + jid="test221%test.com@test.com", + account=account22) + model.db_disconnect() + info_query = Iq(stanza_type="set", + from_jid="user1@test.com", + to_jid="jcl.test.com") + command_node = info_query.set_new_content(command.COMMAND_NS, "command") + command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-roster") + result = self.command_manager.apply_command_action(info_query, + "http://jabber.org/protocol/admin#get-user-roster", + "execute") + self.assertNotEquals(result, None) + self.assertEquals(len(result), 1) + xml_command = result[0].xpath_eval("c:command", + {"c": "http://jabber.org/protocol/commands"})[0] + self.assertEquals(xml_command.prop("status"), "executing") + self.assertNotEquals(xml_command.prop("sessionid"), None) + self.__check_actions(result[0], ["complete"]) + + # Second step + info_query = Iq(stanza_type="set", + from_jid="user1@test.com", + to_jid="jcl.test.com") + command_node = info_query.set_new_content(command.COMMAND_NS, "command") + command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-roster") + session_id = xml_command.prop("sessionid") + command_node.setProp("sessionid", session_id) + command_node.setProp("action", "complete") + submit_form = Form(xmlnode_or_type="submit") + submit_form.add_field(field_type="jid-single", + name="user_jid", + value="test1@test.com") + submit_form.as_xml(command_node) + result = self.command_manager.apply_command_action(info_query, + "http://jabber.org/protocol/admin#get-user-roster", + "execute") + self.assertNotEquals(result, None) + self.assertEquals(len(result), 1) + xml_command = result[0].xpath_eval("c:command", + {"c": "http://jabber.org/protocol/commands"})[0] + self.assertEquals(xml_command.prop("status"), "completed") + self.assertEquals(xml_command.prop("sessionid"), session_id) + self.__check_actions(result[0]) + context_session = self.command_manager.sessions[session_id][1] + self.assertEquals(context_session["user_jid"], + "test1@test.com") + fields = result[0].xpath_eval("c:command/data:x/data:field", + {"c": "http://jabber.org/protocol/commands", + "data": "jabber:x:data"}) + self.assertEquals(len(fields), 2) + self.assertEquals(fields[0].prop("var"), "FORM_TYPE") + self.assertEquals(fields[0].prop("type"), "hidden") + self.assertEquals(fields[0].children.name, "value") + self.assertEquals(fields[0].children.content, + "http://jabber.org/protocol/admin") + items = result[0].xpath_eval("c:command/data:x/roster:query/roster:item", + {"c": "http://jabber.org/protocol/commands", + "data": "jabber:x:data", + "roster": "jabber:iq:roster"}) + self.assertEquals(len(items), 3) + self.assertEquals(items[0].prop("jid"), "test111%test.com@test.com") + self.assertEquals(items[0].prop("name"), "test111@test.com") + self.assertEquals(items[1].prop("jid"), "test112%test.com@test.com") + self.assertEquals(items[1].prop("name"), "test112@test.com") + self.assertEquals(items[2].prop("jid"), "test121%test.com@test.com") + self.assertEquals(items[2].prop("name"), "test121@test.com") # def test_execute_get_user_last_login(self): # #TODO : implement command diff --git a/src/jcl/jabber/tests/component.py b/src/jcl/jabber/tests/component.py index d626900..393d273 100644 --- a/src/jcl/jabber/tests/component.py +++ b/src/jcl/jabber/tests/component.py @@ -25,9 +25,8 @@ import unittest import threading import time -import sys -import os import re +import tempfile from sqlobject.dbconnection import TheURIOpener @@ -47,12 +46,7 @@ from jcl.model.account import Account, LegacyJID from jcl.lang import Lang from jcl.model.tests.account import ExampleAccount, Example2Account - -if sys.platform == "win32": - DB_PATH = "/c|/temp/jcl_test.db" -else: - DB_PATH = "/tmp/jcl_test.db" -DB_URL = DB_PATH# + "?debug=1&debugThreading=1" +from jcl.tests import JCLTestCase class MockStream(object): def __init__(self, @@ -159,39 +153,20 @@ class HandlerMock(object): self.handled.append((stanza, lang_class, data)) return [(stanza, lang_class, data)] -class JCLComponent_TestCase(unittest.TestCase): +class JCLComponent_TestCase(JCLTestCase): ########################################################################### # Utility methods ########################################################################### def setUp(self): - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) + JCLTestCase.setUp(self, tables=[Account, LegacyJID, ExampleAccount, + Example2Account]) self.comp = JCLComponent("jcl.test.com", "password", "localhost", "5347") - model.db_connection_str = 'sqlite://' + DB_URL - model.db_connect() - Account.createTable(ifNotExists=True) - LegacyJID.createTable(ifNotExists=True) - ExampleAccount.createTable(ifNotExists=True) - Example2Account.createTable(ifNotExists=True) - model.db_disconnect() self.max_tick_count = 1 self.saved_time_handler = None - def tearDown(self): - model.db_connect() - Example2Account.dropTable(ifExists=True) - ExampleAccount.dropTable(ifExists=True) - LegacyJID.dropTable(ifExists=True) - Account.dropTable(ifExists=True) - del TheURIOpener.cachedURIs['sqlite://' + DB_URL] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - ########################################################################### # Constructor tests ########################################################################### @@ -853,7 +828,7 @@ class JCLComponent_TestCase(unittest.TestCase): self.assertEquals(fields[2].prop("type"), "text-private") self.assertEquals(fields[2].prop("var"), "password") - self.assertEquals(fields[2].prop("label"), "password") + self.assertEquals(fields[2].prop("label"), Lang.en.field_password) self.assertEquals(fields[3].prop("type"), "boolean") self.assertEquals(fields[3].prop("var"), "store_password") @@ -1004,7 +979,7 @@ class JCLComponent_TestCase(unittest.TestCase): field = fields[2] self.assertEquals(field.prop("type"), "text-private") self.assertEquals(field.prop("var"), "password") - self.assertEquals(field.prop("label"), "password") + self.assertEquals(field.prop("label"), Lang.en.field_password) self.assertEquals(field.children.name, "value") self.assertEquals(field.children.content, "mypassword") field = fields[3] @@ -2524,24 +2499,10 @@ class JCLComponent_TestCase(unittest.TestCase): "data": "jabber:x:data"}) self.assertEquals(len(items), 2) -class Handler_TestCase(unittest.TestCase): +class Handler_TestCase(JCLTestCase): def setUp(self): self.handler = Handler(None) - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - model.db_connection_str = 'sqlite://' + DB_URL - model.db_connect() - Account.createTable(ifNotExists = True) - model.db_disconnect() - - def tearDown(self): - model.db_connect() - Account.dropTable(ifExists = True) - del TheURIOpener.cachedURIs['sqlite://' + DB_URL] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) + JCLTestCase.setUp(self, tables=[Account]) def test_filter(self): model.db_connect() diff --git a/src/jcl/jabber/tests/feeder.py b/src/jcl/jabber/tests/feeder.py index bffdfef..b3391b5 100644 --- a/src/jcl/jabber/tests/feeder.py +++ b/src/jcl/jabber/tests/feeder.py @@ -22,31 +22,21 @@ ## import unittest -import os import threading import time -import sys from sqlobject import * -from sqlobject.dbconnection import TheURIOpener - -from pyxmpp.message import Message from jcl.jabber.component import JCLComponent from jcl.jabber.feeder import FeederComponent, Feeder, Sender, MessageSender, \ HeadlineSender, FeederHandler from jcl.model.account import Account, LegacyJID import jcl.model as model -from jcl.model import account from jcl.model.tests.account import ExampleAccount, Example2Account from jcl.jabber.tests.component import JCLComponent_TestCase, MockStream -if sys.platform == "win32": - DB_PATH = "/c|/temp/jcl_test.db" -else: - DB_PATH = "/tmp/jcl_test.db" -DB_URL = DB_PATH #+ "?debug=1&debugThreading=1" +from jcl.tests import JCLTestCase class FeederMock(object): def feed(self, _account): @@ -61,31 +51,12 @@ class SenderMock(object): class FeederComponent_TestCase(JCLComponent_TestCase): def setUp(self): - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) + JCLTestCase.setUp(self, tables=[Account, LegacyJID, ExampleAccount, + Example2Account]) self.comp = FeederComponent("jcl.test.com", "password", "localhost", "5347") - model.db_connection_str = 'sqlite://' + DB_URL - model.db_connect() - Account.createTable(ifNotExists = True) - LegacyJID.createTable(ifNotExists=True) - ExampleAccount.createTable(ifNotExists = True) - Example2Account.createTable(ifNotExists = True) - model.db_disconnect() - - def tearDown(self): - model.db_connect() - Account.dropTable(ifExists = True) - LegacyJID.dropTable(ifExists=True) - ExampleAccount.dropTable(ifExists = True) - Example2Account.dropTable(ifExists = True) - del TheURIOpener.cachedURIs['sqlite://' + DB_URL] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) def test_run(self): self.comp.time_unit = 1 @@ -184,30 +155,16 @@ class Sender_TestCase(unittest.TestCase): sender = Sender() self.assertRaises(NotImplementedError, sender.send, None, None, None) -class MessageSender_TestCase(unittest.TestCase): +class MessageSender_TestCase(JCLTestCase): def setUp(self): - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) + JCLTestCase.setUp(self, tables=[Account]) self.comp = FeederComponent("jcl.test.com", "password", "localhost", "5347") - model.db_connection_str = 'sqlite://' + DB_URL - model.db_connect() - Account.createTable(ifNotExists = True) - model.db_disconnect() self.sender = MessageSender(self.comp) self.message_type = None - def tearDown(self): - model.db_connect() - Account.dropTable(ifExists = True) - del TheURIOpener.cachedURIs['sqlite://' + DB_URL] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - def test_send(self): self.comp.stream = MockStream() self.comp.stream_class = MockStream @@ -231,27 +188,10 @@ class HeadlineSender_TestCase(MessageSender_TestCase): self.sender = HeadlineSender(self.comp) self.message_type = "headline" -class FeederHandler_TestCase(unittest.TestCase): +class FeederHandler_TestCase(JCLTestCase): def setUp(self): + JCLTestCase.setUp(self, tables=[Account, ExampleAccount]) self.handler = FeederHandler(FeederMock(), SenderMock()) - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - model.db_connection_str = 'sqlite://' + DB_URL - model.db_connect() - Account.createTable(ifNotExists = True) - ExampleAccount.createTable(ifNotExists = True) - model.db_disconnect() - - def tearDown(self): - self.handler = None - model.db_connect() - ExampleAccount.dropTable(ifExists = True) - Account.dropTable(ifExists = True) - del TheURIOpener.cachedURIs['sqlite://' + DB_URL] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) def test_filter(self): model.db_connect() diff --git a/src/jcl/jabber/tests/message.py b/src/jcl/jabber/tests/message.py index aae8caf..8c6432b 100644 --- a/src/jcl/jabber/tests/message.py +++ b/src/jcl/jabber/tests/message.py @@ -21,11 +21,8 @@ ## import unittest -import sys -import os from pyxmpp.message import Message -from sqlobject.dbconnection import TheURIOpener from jcl.lang import Lang from jcl.jabber.component import JCLComponent @@ -35,38 +32,17 @@ import jcl.model as model from jcl.model.account import Account from jcl.model.tests.account import ExampleAccount +from jcl.tests import JCLTestCase -if sys.platform == "win32": - DB_PATH = "/c|/temp/jcl_test.db" -else: - DB_PATH = "/tmp/jcl_test.db" -DB_URL = DB_PATH# + "?debug=1&debugThreading=1" - -class PasswordMessageHandler_TestCase(unittest.TestCase): +class PasswordMessageHandler_TestCase(JCLTestCase): def setUp(self): + JCLTestCase.setUp(self, tables=[Account, ExampleAccount]) self.comp = JCLComponent("jcl.test.com", "password", "localhost", "5347", - 'sqlite://' + DB_URL) + self.db_url) self.handler = PasswordMessageHandler(self.comp) - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - model.db_connection_str = 'sqlite://' + DB_URL - model.db_connect() - Account.createTable(ifNotExists = True) - ExampleAccount.createTable(ifNotExists = True) - model.db_disconnect() - - def tearDown(self): - model.db_connect() - ExampleAccount.dropTable(ifExists = True) - Account.dropTable(ifExists = True) - del TheURIOpener.cachedURIs['sqlite://' + DB_URL] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) def test_filter_waiting_password(self): model.db_connect() diff --git a/src/jcl/model/tests/account.py b/src/jcl/model/tests/account.py index 72e2ce9..c54ad2e 100644 --- a/src/jcl/model/tests/account.py +++ b/src/jcl/model/tests/account.py @@ -22,21 +22,20 @@ import unittest import sys -import os from sqlobject import * -from sqlobject.dbconnection import TheURIOpener from jcl.error import FieldError import jcl.model as model from jcl.model import account from jcl.model.account import Account, PresenceAccount +from jcl.tests import JCLTestCase + if sys.platform == "win32": - DB_PATH = "/c|/temp/jcl_test.db" + DB_DIR = "/c|/temp/" else: - DB_PATH = "/tmp/jcl_test.db" -DB_URL = DB_PATH# + "?debug=1&debugThreading=1" + DB_DIR = "/tmp/" class ExampleAccount(Account): login = StringCol(default="") @@ -163,10 +162,7 @@ class AccountModule_TestCase(unittest.TestCase): self.assertEquals(account.mandatory_field("test", "value"), "value") -class InheritableAccount_TestCase(unittest.TestCase): - def setUp(self): - self.db_url = DB_URL - model.db_connection_str = 'sqlite://' + self.db_url +class InheritableAccount_TestCase(JCLTestCase): def test_get_register_fields(self): """ @@ -191,26 +187,9 @@ class InheritableAccount_TestCase(unittest.TestCase): class Account_TestCase(InheritableAccount_TestCase): def setUp(self): - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - self.db_url = DB_URL - model.db_connection_str = 'sqlite://' + self.db_url - model.db_connect() - Account.createTable(ifNotExists = True) - ExampleAccount.createTable(ifNotExists = True) - model.db_disconnect() + JCLTestCase.setUp(self, tables=[Account, ExampleAccount]) self.account_class = Account - def tearDown(self): - model.db_connect() - ExampleAccount.dropTable(ifExists = True) - Account.dropTable(ifExists = True) - del TheURIOpener.cachedURIs['sqlite://' + self.db_url] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - def test_set_status(self): model.db_connect() account11 = Account(user_jid="test1@test.com", @@ -240,14 +219,9 @@ class Account_TestCase(InheritableAccount_TestCase): class PresenceAccount_TestCase(InheritableAccount_TestCase): def setUp(self): - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - self.db_url = DB_URL - model.db_connection_str = 'sqlite://' + self.db_url + JCLTestCase.setUp(self, tables=[Account, PresenceAccount, + PresenceAccountExample]) model.db_connect() - Account.createTable(ifNotExists = True) - PresenceAccount.createTable(ifNotExists = True) - PresenceAccountExample.createTable(ifNotExists = True) self.account = PresenceAccountExample(\ user_jid="test1@test.com", name="account11", @@ -255,17 +229,6 @@ class PresenceAccount_TestCase(InheritableAccount_TestCase): model.db_disconnect() self.account_class = PresenceAccount - def tearDown(self): - model.db_connect() - PresenceAccountExample.dropTable(ifExists = True) - PresenceAccount.dropTable(ifExists = True) - Account.dropTable(ifExists = True) - del TheURIOpener.cachedURIs['sqlite://' + self.db_url] - model.hub.threadConnection.close() - model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - def test_get_presence_actions_fields(self): fields = self.account_class.get_presence_actions_fields() self.assertEquals(len(fields), 6) diff --git a/src/jcl/tests/__init__.py b/src/jcl/tests/__init__.py index db400bf..1e65ddd 100644 --- a/src/jcl/tests/__init__.py +++ b/src/jcl/tests/__init__.py @@ -2,6 +2,44 @@ __revision__ = "" import unittest +import os +import tempfile +import sys + +from sqlobject.dbconnection import TheURIOpener +import jcl.model + +if sys.platform == "win32": + DB_DIR = "/c|/temp/" +else: + DB_DIR = "/tmp/" + +class JCLTestCase(unittest.TestCase): + def __init__(self, methodName='runTest'): + unittest.TestCase.__init__(self, methodName) + self.tables = [] + + def setUp(self, tables=[]): + self.tables = tables + self.db_path = tempfile.mktemp("db", "jcltest", DB_DIR) + if os.path.exists(self.db_path): + os.unlink(self.db_path) + self.db_url = "sqlite://" + self.db_path + jcl.model.db_connection_str = self.db_url + jcl.model.db_connect() + for table in tables: + table.createTable(ifNotExists=True) + jcl.model.db_disconnect() + + def tearDown(self): + jcl.model.db_connect() + for table in self.tables: + table.dropTable(ifExists=True) + del TheURIOpener.cachedURIs[self.db_url] + jcl.model.hub.threadConnection.close() + jcl.model.db_disconnect() + if os.path.exists(self.db_path): + os.unlink(self.db_path) from jcl.tests import lang, runner from jcl.jabber import tests as jabber diff --git a/src/jcl/tests/runner.py b/src/jcl/tests/runner.py index b9b7fd1..45e263d 100644 --- a/src/jcl/tests/runner.py +++ b/src/jcl/tests/runner.py @@ -23,6 +23,7 @@ import unittest import sys import os +import tempfile from sqlobject import * @@ -30,14 +31,12 @@ import jcl from jcl.runner import JCLRunner import jcl.model as model -from jcl.model import account from jcl.model.account import Account, PresenceAccount if sys.platform == "win32": - DB_PATH = "/c|/temp/test.db" + DB_DIR = "/c|/temp/" else: - DB_PATH = "/tmp/test.db" -DB_URL = "sqlite://" + DB_PATH# + "?debug=1&debugThreading=1" + DB_DIR = "/tmp/" class JCLRunner_TestCase(unittest.TestCase): def setUp(self): @@ -122,7 +121,9 @@ class JCLRunner_TestCase(unittest.TestCase): def test__run(self): self.runner.pid_file = "/tmp/jcl.pid" - self.runner.db_url = DB_URL + db_path = tempfile.mktemp("db", "jcltest", DB_DIR) + db_url = "sqlite://" + db_path + self.runner.db_url = db_url def do_nothing(): pass self.runner._run(do_nothing) @@ -131,7 +132,7 @@ class JCLRunner_TestCase(unittest.TestCase): Account.dropTable() PresenceAccount.dropTable() model.db_disconnect() - os.unlink(DB_PATH) + os.unlink(db_path) self.assertFalse(os.access("/tmp/jcl.pid", os.F_OK)) def test__get_help(self):