get-user-roster ad-hoc command implementation

darcs-hash:20070731042819-86b55-dc9ad0c5efc897ac7387a0fb81b76a681537e453.gz
This commit is contained in:
David Rousselie
2007-07-31 06:28:19 +02:00
parent b0ad50e260
commit ddc71a414b
10 changed files with 224 additions and 239 deletions

View File

@@ -44,7 +44,12 @@ if __name__ == '__main__':
logger = logging.getLogger() logger = logging.getLogger()
logger.addHandler(logging.StreamHandler()) logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.CRITICAL) logger.setLevel(logging.CRITICAL)
try:
print "Trying to launch test with testoob"
import testoob
testoob.main(defaultTest='suite')
except ImportError:
print "Falling back to standard unittest"
unittest.main(defaultTest='suite') unittest.main(defaultTest='suite')
coverage.stop() coverage.stop()

View File

@@ -334,9 +334,11 @@ class JCLCommandManager(CommandManager):
return (self.add_form_select_user_jids(command_node, lang_class), []) return (self.add_form_select_user_jids(command_node, lang_class), [])
def select_user_jid_step_1(self, info_query, session_context, def select_user_jid_step_1(self, info_query, session_context,
command_node, lang_class): command_node, lang_class,
actions=[ACTION_NEXT],
default_action=0):
self.__logger.debug("Executing select_user_jid step 1") self.__logger.debug("Executing select_user_jid step 1")
self.add_actions(command_node, [ACTION_NEXT]) self.add_actions(command_node, actions, default_action)
return (self.add_form_select_user_jid(command_node, lang_class), []) return (self.add_form_select_user_jid(command_node, lang_class), [])
def select_accounts_step_2(self, info_query, session_context, def select_accounts_step_2(self, info_query, session_context,
@@ -520,8 +522,36 @@ class JCLCommandManager(CommandManager):
command_node.setProp("status", STATUS_COMPLETED) command_node.setProp("status", STATUS_COMPLETED)
return (None, []) return (None, [])
def execute_get_user_roster(self, info_query): def execute_get_user_roster_1(self, info_query, session_context,
return [] command_node, lang_class):
(result_form, result) = self.select_user_jid_step_1(info_query,
session_context,
command_node,
lang_class,
actions=[ACTION_COMPLETE])
return (result_form, result)
def execute_get_user_roster_2(self, info_query, session_context,
command_node, lang_class):
self.__logger.debug("Executing command 'get-user-roster' step 2")
result_form = Form(xmlnode_or_type="result")
result_form.add_field(field_type="hidden",
name="FORM_TYPE",
value="http://jabber.org/protocol/admin")
user_jid = session_context["user_jid"]
result_form.fields.append(FieldNoType(name="user_jid",
value=user_jid))
result_form.as_xml(command_node)
x_form = command_node.children
query = x_form.newChild(None, "query", None)
roster_ns = query.newNs("jabber:iq:roster", None)
query.setNs(roster_ns)
for legacy_jid in account.get_legacy_jids(user_jid):
item = query.newChild(None, "item", None)
item.setProp("jid", legacy_jid.jid)
item.setProp("name", legacy_jid.legacy_address)
command_node.setProp("status", STATUS_COMPLETED)
return (result_form, [])
def execute_get_user_lastlogin(self, info_query): def execute_get_user_lastlogin(self, info_query):
return [] return []

View File

@@ -25,7 +25,6 @@ from pyxmpp.presence import Presence
from jcl.jabber import Handler from jcl.jabber import Handler
import jcl.jabber as jabber import jcl.jabber as jabber
import jcl.model as model
class DefaultPresenceHandler(Handler): class DefaultPresenceHandler(Handler):
"""Handle presence""" """Handle presence"""
@@ -69,6 +68,9 @@ class DefaultUnsubscribeHandler(Handler):
class AccountPresenceHandler(Handler): class AccountPresenceHandler(Handler):
filter = jabber.get_account_filter filter = jabber.get_account_filter
def get_account_presence(self, stanza, lang_class, _account):
raise NotImplemented
def handle(self, stanza, lang_class, data): def handle(self, stanza, lang_class, data):
"""Handle presence sent to an account JID""" """Handle presence sent to an account JID"""
result = [] result = []
@@ -82,9 +84,12 @@ class AccountPresenceAvailableHandler(AccountPresenceHandler):
stanza.get_show(), stanza.get_show(),
lang_class) lang_class)
class RootPresenceHandler(Handler): class RootPresenceHandler(AccountPresenceHandler):
filter = jabber.get_accounts_root_filter filter = jabber.get_accounts_root_filter
def get_root_presence(self, stanza, lang_class, nb_accounts):
raise NotImplemented
def handle(self, stanza, lang_class, data): def handle(self, stanza, lang_class, data):
"""handle presence sent to component JID""" """handle presence sent to component JID"""
result = [] result = []

View File

@@ -21,30 +21,21 @@
## ##
import unittest import unittest
import sys
import os
from pyxmpp.presence import Presence from pyxmpp.presence import Presence
from pyxmpp.jabber.dataforms import Form from pyxmpp.jabber.dataforms import Form
from pyxmpp.iq import Iq from pyxmpp.iq import Iq
from pyxmpp.message import Message from pyxmpp.message import Message
from sqlobject.dbconnection import TheURIOpener
from jcl.lang import Lang from jcl.lang import Lang
from jcl.jabber.component import JCLComponent from jcl.jabber.component import JCLComponent
import jcl.jabber.command as command import jcl.jabber.command as command
from jcl.jabber.command import FieldNoType, JCLCommandManager from jcl.jabber.command import FieldNoType, JCLCommandManager
import jcl.model as model import jcl.model as model
import jcl.model.account as account import jcl.model.account as account
from jcl.model.account import Account from jcl.model.account import Account, LegacyJID
from jcl.model.tests.account import ExampleAccount, Example2Account from jcl.model.tests.account import ExampleAccount, Example2Account
from jcl.tests import JCLTestCase
if sys.platform == "win32":
DB_PATH = "/c|/temp/jcl_test.db"
else:
DB_PATH = "/tmp/jcl_test.db"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
class FieldNoType_TestCase(unittest.TestCase): class FieldNoType_TestCase(unittest.TestCase):
def test_complete_xml_element(self): def test_complete_xml_element(self):
@@ -64,32 +55,17 @@ class CommandManager_TestCase(unittest.TestCase):
command_name = command.command_manager.get_short_command_name("test-command") command_name = command.command_manager.get_short_command_name("test-command")
self.assertEquals(command_name, "test_command") self.assertEquals(command_name, "test_command")
class JCLCommandManager_TestCase(unittest.TestCase): class JCLCommandManager_TestCase(JCLTestCase):
def setUp(self): def setUp(self):
JCLTestCase.setUp(self, tables=[Account, ExampleAccount,
Example2Account, LegacyJID])
self.comp = JCLComponent("jcl.test.com", self.comp = JCLComponent("jcl.test.com",
"password", "password",
"localhost", "localhost",
"5347") "5347")
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists=True)
ExampleAccount.createTable(ifNotExists=True)
Example2Account.createTable(ifNotExists = True)
model.db_disconnect()
self.command_manager = JCLCommandManager(self.comp, self.command_manager = JCLCommandManager(self.comp,
self.comp.account_manager) self.comp.account_manager)
def tearDown(self):
model.db_connect()
Example2Account.dropTable(ifExists=True)
ExampleAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def __check_actions(self, info_query, expected_actions=None, action_index=0): def __check_actions(self, info_query, expected_actions=None, action_index=0):
actions = info_query.xpath_eval("c:command/c:actions", actions = info_query.xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"}) {"c": "http://jabber.org/protocol/commands"})
@@ -1255,14 +1231,104 @@ class JCLCommandManager_TestCase(unittest.TestCase):
"pass2") "pass2")
self.assertEquals(account11.password, "pass2") self.assertEquals(account11.password, "pass2")
# def test_execute_get_user_roster(self): def test_execute_get_user_roster(self):
# #TODO : implement command self.comp.account_manager.account_classes = (ExampleAccount,
# info_query = Iq(stanza_type="set", Example2Account)
# from_jid="user1@test.com", model.db_connect()
# to_jid="jcl.test.com") account11 = ExampleAccount(user_jid="test1@test.com",
# result = self.command_manager.execute_add_user(info_query) name="account11",
# self.assertNotEquals(result, None) jid="account11@jcl.test.com")
# self.assertEquals(len(result), 1) ljid111 = LegacyJID(legacy_address="test111@test.com",
jid="test111%test.com@test.com",
account=account11)
ljid112 = LegacyJID(legacy_address="test112@test.com",
jid="test112%test.com@test.com",
account=account11)
account12 = Example2Account(user_jid="test1@test.com",
name="account12",
jid="account12@jcl.test.com")
ljid121 = LegacyJID(legacy_address="test121@test.com",
jid="test121%test.com@test.com",
account=account12)
account21 = ExampleAccount(user_jid="test2@test.com",
name="account21",
jid="account21@jcl.test.com")
ljid211 = LegacyJID(legacy_address="test211@test.com",
jid="test211%test.com@test.com",
account=account21)
ljid212 = LegacyJID(legacy_address="test212@test.com",
jid="test212%test.com@test.com",
account=account21)
account22 = ExampleAccount(user_jid="test2@test.com",
name="account11",
jid="account11@jcl.test.com")
ljid221 = LegacyJID(legacy_address="test221@test.com",
jid="test221%test.com@test.com",
account=account22)
model.db_disconnect()
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-roster")
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#get-user-roster",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertNotEquals(xml_command.prop("sessionid"), None)
self.__check_actions(result[0], ["complete"])
# Second step
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-roster")
session_id = xml_command.prop("sessionid")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "complete")
submit_form = Form(xmlnode_or_type="submit")
submit_form.add_field(field_type="jid-single",
name="user_jid",
value="test1@test.com")
submit_form.as_xml(command_node)
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#get-user-roster",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "completed")
self.assertEquals(xml_command.prop("sessionid"), session_id)
self.__check_actions(result[0])
context_session = self.command_manager.sessions[session_id][1]
self.assertEquals(context_session["user_jid"],
"test1@test.com")
fields = result[0].xpath_eval("c:command/data:x/data:field",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(fields), 2)
self.assertEquals(fields[0].prop("var"), "FORM_TYPE")
self.assertEquals(fields[0].prop("type"), "hidden")
self.assertEquals(fields[0].children.name, "value")
self.assertEquals(fields[0].children.content,
"http://jabber.org/protocol/admin")
items = result[0].xpath_eval("c:command/data:x/roster:query/roster:item",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data",
"roster": "jabber:iq:roster"})
self.assertEquals(len(items), 3)
self.assertEquals(items[0].prop("jid"), "test111%test.com@test.com")
self.assertEquals(items[0].prop("name"), "test111@test.com")
self.assertEquals(items[1].prop("jid"), "test112%test.com@test.com")
self.assertEquals(items[1].prop("name"), "test112@test.com")
self.assertEquals(items[2].prop("jid"), "test121%test.com@test.com")
self.assertEquals(items[2].prop("name"), "test121@test.com")
# def test_execute_get_user_last_login(self): # def test_execute_get_user_last_login(self):
# #TODO : implement command # #TODO : implement command

View File

@@ -25,9 +25,8 @@ import unittest
import threading import threading
import time import time
import sys
import os
import re import re
import tempfile
from sqlobject.dbconnection import TheURIOpener from sqlobject.dbconnection import TheURIOpener
@@ -47,12 +46,7 @@ from jcl.model.account import Account, LegacyJID
from jcl.lang import Lang from jcl.lang import Lang
from jcl.model.tests.account import ExampleAccount, Example2Account from jcl.model.tests.account import ExampleAccount, Example2Account
from jcl.tests import JCLTestCase
if sys.platform == "win32":
DB_PATH = "/c|/temp/jcl_test.db"
else:
DB_PATH = "/tmp/jcl_test.db"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
class MockStream(object): class MockStream(object):
def __init__(self, def __init__(self,
@@ -159,39 +153,20 @@ class HandlerMock(object):
self.handled.append((stanza, lang_class, data)) self.handled.append((stanza, lang_class, data))
return [(stanza, lang_class, data)] return [(stanza, lang_class, data)]
class JCLComponent_TestCase(unittest.TestCase): class JCLComponent_TestCase(JCLTestCase):
########################################################################### ###########################################################################
# Utility methods # Utility methods
########################################################################### ###########################################################################
def setUp(self): def setUp(self):
if os.path.exists(DB_PATH): JCLTestCase.setUp(self, tables=[Account, LegacyJID, ExampleAccount,
os.unlink(DB_PATH) Example2Account])
self.comp = JCLComponent("jcl.test.com", self.comp = JCLComponent("jcl.test.com",
"password", "password",
"localhost", "localhost",
"5347") "5347")
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists=True)
LegacyJID.createTable(ifNotExists=True)
ExampleAccount.createTable(ifNotExists=True)
Example2Account.createTable(ifNotExists=True)
model.db_disconnect()
self.max_tick_count = 1 self.max_tick_count = 1
self.saved_time_handler = None self.saved_time_handler = None
def tearDown(self):
model.db_connect()
Example2Account.dropTable(ifExists=True)
ExampleAccount.dropTable(ifExists=True)
LegacyJID.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
########################################################################### ###########################################################################
# Constructor tests # Constructor tests
########################################################################### ###########################################################################
@@ -853,7 +828,7 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEquals(fields[2].prop("type"), "text-private") self.assertEquals(fields[2].prop("type"), "text-private")
self.assertEquals(fields[2].prop("var"), "password") self.assertEquals(fields[2].prop("var"), "password")
self.assertEquals(fields[2].prop("label"), "password") self.assertEquals(fields[2].prop("label"), Lang.en.field_password)
self.assertEquals(fields[3].prop("type"), "boolean") self.assertEquals(fields[3].prop("type"), "boolean")
self.assertEquals(fields[3].prop("var"), "store_password") self.assertEquals(fields[3].prop("var"), "store_password")
@@ -1004,7 +979,7 @@ class JCLComponent_TestCase(unittest.TestCase):
field = fields[2] field = fields[2]
self.assertEquals(field.prop("type"), "text-private") self.assertEquals(field.prop("type"), "text-private")
self.assertEquals(field.prop("var"), "password") self.assertEquals(field.prop("var"), "password")
self.assertEquals(field.prop("label"), "password") self.assertEquals(field.prop("label"), Lang.en.field_password)
self.assertEquals(field.children.name, "value") self.assertEquals(field.children.name, "value")
self.assertEquals(field.children.content, "mypassword") self.assertEquals(field.children.content, "mypassword")
field = fields[3] field = fields[3]
@@ -2524,24 +2499,10 @@ class JCLComponent_TestCase(unittest.TestCase):
"data": "jabber:x:data"}) "data": "jabber:x:data"})
self.assertEquals(len(items), 2) self.assertEquals(len(items), 2)
class Handler_TestCase(unittest.TestCase): class Handler_TestCase(JCLTestCase):
def setUp(self): def setUp(self):
self.handler = Handler(None) self.handler = Handler(None)
if os.path.exists(DB_PATH): JCLTestCase.setUp(self, tables=[Account])
os.unlink(DB_PATH)
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists = True)
model.db_disconnect()
def tearDown(self):
model.db_connect()
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_filter(self): def test_filter(self):
model.db_connect() model.db_connect()

View File

@@ -22,31 +22,21 @@
## ##
import unittest import unittest
import os
import threading import threading
import time import time
import sys
from sqlobject import * from sqlobject import *
from sqlobject.dbconnection import TheURIOpener
from pyxmpp.message import Message
from jcl.jabber.component import JCLComponent from jcl.jabber.component import JCLComponent
from jcl.jabber.feeder import FeederComponent, Feeder, Sender, MessageSender, \ from jcl.jabber.feeder import FeederComponent, Feeder, Sender, MessageSender, \
HeadlineSender, FeederHandler HeadlineSender, FeederHandler
from jcl.model.account import Account, LegacyJID from jcl.model.account import Account, LegacyJID
import jcl.model as model import jcl.model as model
from jcl.model import account
from jcl.model.tests.account import ExampleAccount, Example2Account from jcl.model.tests.account import ExampleAccount, Example2Account
from jcl.jabber.tests.component import JCLComponent_TestCase, MockStream from jcl.jabber.tests.component import JCLComponent_TestCase, MockStream
if sys.platform == "win32": from jcl.tests import JCLTestCase
DB_PATH = "/c|/temp/jcl_test.db"
else:
DB_PATH = "/tmp/jcl_test.db"
DB_URL = DB_PATH #+ "?debug=1&debugThreading=1"
class FeederMock(object): class FeederMock(object):
def feed(self, _account): def feed(self, _account):
@@ -61,31 +51,12 @@ class SenderMock(object):
class FeederComponent_TestCase(JCLComponent_TestCase): class FeederComponent_TestCase(JCLComponent_TestCase):
def setUp(self): def setUp(self):
if os.path.exists(DB_PATH): JCLTestCase.setUp(self, tables=[Account, LegacyJID, ExampleAccount,
os.unlink(DB_PATH) Example2Account])
self.comp = FeederComponent("jcl.test.com", self.comp = FeederComponent("jcl.test.com",
"password", "password",
"localhost", "localhost",
"5347") "5347")
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists = True)
LegacyJID.createTable(ifNotExists=True)
ExampleAccount.createTable(ifNotExists = True)
Example2Account.createTable(ifNotExists = True)
model.db_disconnect()
def tearDown(self):
model.db_connect()
Account.dropTable(ifExists = True)
LegacyJID.dropTable(ifExists=True)
ExampleAccount.dropTable(ifExists = True)
Example2Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_run(self): def test_run(self):
self.comp.time_unit = 1 self.comp.time_unit = 1
@@ -184,30 +155,16 @@ class Sender_TestCase(unittest.TestCase):
sender = Sender() sender = Sender()
self.assertRaises(NotImplementedError, sender.send, None, None, None) self.assertRaises(NotImplementedError, sender.send, None, None, None)
class MessageSender_TestCase(unittest.TestCase): class MessageSender_TestCase(JCLTestCase):
def setUp(self): def setUp(self):
if os.path.exists(DB_PATH): JCLTestCase.setUp(self, tables=[Account])
os.unlink(DB_PATH)
self.comp = FeederComponent("jcl.test.com", self.comp = FeederComponent("jcl.test.com",
"password", "password",
"localhost", "localhost",
"5347") "5347")
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists = True)
model.db_disconnect()
self.sender = MessageSender(self.comp) self.sender = MessageSender(self.comp)
self.message_type = None self.message_type = None
def tearDown(self):
model.db_connect()
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_send(self): def test_send(self):
self.comp.stream = MockStream() self.comp.stream = MockStream()
self.comp.stream_class = MockStream self.comp.stream_class = MockStream
@@ -231,27 +188,10 @@ class HeadlineSender_TestCase(MessageSender_TestCase):
self.sender = HeadlineSender(self.comp) self.sender = HeadlineSender(self.comp)
self.message_type = "headline" self.message_type = "headline"
class FeederHandler_TestCase(unittest.TestCase): class FeederHandler_TestCase(JCLTestCase):
def setUp(self): def setUp(self):
JCLTestCase.setUp(self, tables=[Account, ExampleAccount])
self.handler = FeederHandler(FeederMock(), SenderMock()) self.handler = FeederHandler(FeederMock(), SenderMock())
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists = True)
ExampleAccount.createTable(ifNotExists = True)
model.db_disconnect()
def tearDown(self):
self.handler = None
model.db_connect()
ExampleAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_filter(self): def test_filter(self):
model.db_connect() model.db_connect()

View File

@@ -21,11 +21,8 @@
## ##
import unittest import unittest
import sys
import os
from pyxmpp.message import Message from pyxmpp.message import Message
from sqlobject.dbconnection import TheURIOpener
from jcl.lang import Lang from jcl.lang import Lang
from jcl.jabber.component import JCLComponent from jcl.jabber.component import JCLComponent
@@ -35,38 +32,17 @@ import jcl.model as model
from jcl.model.account import Account from jcl.model.account import Account
from jcl.model.tests.account import ExampleAccount from jcl.model.tests.account import ExampleAccount
from jcl.tests import JCLTestCase
if sys.platform == "win32": class PasswordMessageHandler_TestCase(JCLTestCase):
DB_PATH = "/c|/temp/jcl_test.db"
else:
DB_PATH = "/tmp/jcl_test.db"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
class PasswordMessageHandler_TestCase(unittest.TestCase):
def setUp(self): def setUp(self):
JCLTestCase.setUp(self, tables=[Account, ExampleAccount])
self.comp = JCLComponent("jcl.test.com", self.comp = JCLComponent("jcl.test.com",
"password", "password",
"localhost", "localhost",
"5347", "5347",
'sqlite://' + DB_URL) self.db_url)
self.handler = PasswordMessageHandler(self.comp) self.handler = PasswordMessageHandler(self.comp)
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists = True)
ExampleAccount.createTable(ifNotExists = True)
model.db_disconnect()
def tearDown(self):
model.db_connect()
ExampleAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_filter_waiting_password(self): def test_filter_waiting_password(self):
model.db_connect() model.db_connect()

View File

@@ -22,21 +22,20 @@
import unittest import unittest
import sys import sys
import os
from sqlobject import * from sqlobject import *
from sqlobject.dbconnection import TheURIOpener
from jcl.error import FieldError from jcl.error import FieldError
import jcl.model as model import jcl.model as model
from jcl.model import account from jcl.model import account
from jcl.model.account import Account, PresenceAccount from jcl.model.account import Account, PresenceAccount
from jcl.tests import JCLTestCase
if sys.platform == "win32": if sys.platform == "win32":
DB_PATH = "/c|/temp/jcl_test.db" DB_DIR = "/c|/temp/"
else: else:
DB_PATH = "/tmp/jcl_test.db" DB_DIR = "/tmp/"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
class ExampleAccount(Account): class ExampleAccount(Account):
login = StringCol(default="") login = StringCol(default="")
@@ -163,10 +162,7 @@ class AccountModule_TestCase(unittest.TestCase):
self.assertEquals(account.mandatory_field("test", "value"), self.assertEquals(account.mandatory_field("test", "value"),
"value") "value")
class InheritableAccount_TestCase(unittest.TestCase): class InheritableAccount_TestCase(JCLTestCase):
def setUp(self):
self.db_url = DB_URL
model.db_connection_str = 'sqlite://' + self.db_url
def test_get_register_fields(self): def test_get_register_fields(self):
""" """
@@ -191,26 +187,9 @@ class InheritableAccount_TestCase(unittest.TestCase):
class Account_TestCase(InheritableAccount_TestCase): class Account_TestCase(InheritableAccount_TestCase):
def setUp(self): def setUp(self):
if os.path.exists(DB_PATH): JCLTestCase.setUp(self, tables=[Account, ExampleAccount])
os.unlink(DB_PATH)
self.db_url = DB_URL
model.db_connection_str = 'sqlite://' + self.db_url
model.db_connect()
Account.createTable(ifNotExists = True)
ExampleAccount.createTable(ifNotExists = True)
model.db_disconnect()
self.account_class = Account self.account_class = Account
def tearDown(self):
model.db_connect()
ExampleAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + self.db_url]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_set_status(self): def test_set_status(self):
model.db_connect() model.db_connect()
account11 = Account(user_jid="test1@test.com", account11 = Account(user_jid="test1@test.com",
@@ -240,14 +219,9 @@ class Account_TestCase(InheritableAccount_TestCase):
class PresenceAccount_TestCase(InheritableAccount_TestCase): class PresenceAccount_TestCase(InheritableAccount_TestCase):
def setUp(self): def setUp(self):
if os.path.exists(DB_PATH): JCLTestCase.setUp(self, tables=[Account, PresenceAccount,
os.unlink(DB_PATH) PresenceAccountExample])
self.db_url = DB_URL
model.db_connection_str = 'sqlite://' + self.db_url
model.db_connect() model.db_connect()
Account.createTable(ifNotExists = True)
PresenceAccount.createTable(ifNotExists = True)
PresenceAccountExample.createTable(ifNotExists = True)
self.account = PresenceAccountExample(\ self.account = PresenceAccountExample(\
user_jid="test1@test.com", user_jid="test1@test.com",
name="account11", name="account11",
@@ -255,17 +229,6 @@ class PresenceAccount_TestCase(InheritableAccount_TestCase):
model.db_disconnect() model.db_disconnect()
self.account_class = PresenceAccount self.account_class = PresenceAccount
def tearDown(self):
model.db_connect()
PresenceAccountExample.dropTable(ifExists = True)
PresenceAccount.dropTable(ifExists = True)
Account.dropTable(ifExists = True)
del TheURIOpener.cachedURIs['sqlite://' + self.db_url]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_get_presence_actions_fields(self): def test_get_presence_actions_fields(self):
fields = self.account_class.get_presence_actions_fields() fields = self.account_class.get_presence_actions_fields()
self.assertEquals(len(fields), 6) self.assertEquals(len(fields), 6)

View File

@@ -2,6 +2,44 @@
__revision__ = "" __revision__ = ""
import unittest import unittest
import os
import tempfile
import sys
from sqlobject.dbconnection import TheURIOpener
import jcl.model
if sys.platform == "win32":
DB_DIR = "/c|/temp/"
else:
DB_DIR = "/tmp/"
class JCLTestCase(unittest.TestCase):
def __init__(self, methodName='runTest'):
unittest.TestCase.__init__(self, methodName)
self.tables = []
def setUp(self, tables=[]):
self.tables = tables
self.db_path = tempfile.mktemp("db", "jcltest", DB_DIR)
if os.path.exists(self.db_path):
os.unlink(self.db_path)
self.db_url = "sqlite://" + self.db_path
jcl.model.db_connection_str = self.db_url
jcl.model.db_connect()
for table in tables:
table.createTable(ifNotExists=True)
jcl.model.db_disconnect()
def tearDown(self):
jcl.model.db_connect()
for table in self.tables:
table.dropTable(ifExists=True)
del TheURIOpener.cachedURIs[self.db_url]
jcl.model.hub.threadConnection.close()
jcl.model.db_disconnect()
if os.path.exists(self.db_path):
os.unlink(self.db_path)
from jcl.tests import lang, runner from jcl.tests import lang, runner
from jcl.jabber import tests as jabber from jcl.jabber import tests as jabber

View File

@@ -23,6 +23,7 @@
import unittest import unittest
import sys import sys
import os import os
import tempfile
from sqlobject import * from sqlobject import *
@@ -30,14 +31,12 @@ import jcl
from jcl.runner import JCLRunner from jcl.runner import JCLRunner
import jcl.model as model import jcl.model as model
from jcl.model import account
from jcl.model.account import Account, PresenceAccount from jcl.model.account import Account, PresenceAccount
if sys.platform == "win32": if sys.platform == "win32":
DB_PATH = "/c|/temp/test.db" DB_DIR = "/c|/temp/"
else: else:
DB_PATH = "/tmp/test.db" DB_DIR = "/tmp/"
DB_URL = "sqlite://" + DB_PATH# + "?debug=1&debugThreading=1"
class JCLRunner_TestCase(unittest.TestCase): class JCLRunner_TestCase(unittest.TestCase):
def setUp(self): def setUp(self):
@@ -122,7 +121,9 @@ class JCLRunner_TestCase(unittest.TestCase):
def test__run(self): def test__run(self):
self.runner.pid_file = "/tmp/jcl.pid" self.runner.pid_file = "/tmp/jcl.pid"
self.runner.db_url = DB_URL db_path = tempfile.mktemp("db", "jcltest", DB_DIR)
db_url = "sqlite://" + db_path
self.runner.db_url = db_url
def do_nothing(): def do_nothing():
pass pass
self.runner._run(do_nothing) self.runner._run(do_nothing)
@@ -131,7 +132,7 @@ class JCLRunner_TestCase(unittest.TestCase):
Account.dropTable() Account.dropTable()
PresenceAccount.dropTable() PresenceAccount.dropTable()
model.db_disconnect() model.db_disconnect()
os.unlink(DB_PATH) os.unlink(db_path)
self.assertFalse(os.access("/tmp/jcl.pid", os.F_OK)) self.assertFalse(os.access("/tmp/jcl.pid", os.F_OK))
def test__get_help(self): def test__get_help(self):