presence tests

darcs-hash:20061013185925-86b55-fbb89113ad11938d8be42606a13a17058ee7ccdf.gz
This commit is contained in:
David Rousselie
2006-10-13 20:59:25 +02:00
parent 7dba7030ac
commit 226fc220e3
5 changed files with 310 additions and 38 deletions

View File

@@ -49,15 +49,15 @@ if __name__ == '__main__':
feeder_component_suite = unittest.makeSuite(FeederComponent_TestCase, "test")
feeder_suite = unittest.makeSuite(Feeder_TestCase, "test")
sender_suite = unittest.makeSuite(Sender_TestCase, "test")
# jcl_suite = unittest.TestSuite()
jcl_suite = unittest.TestSuite()
# jcl_suite.addTest(FeederComponent_TestCase('test_handle_tick'))
# jcl_suite.addTest(FeederComponent_TestCase('test_run'))
# jcl_suite.addTest(FeederComponent_TestCase('test_handle_presence_unavailable_to_account'))
# jcl_suite = unittest.TestSuite((feeder_component_suite))
jcl_suite = unittest.TestSuite((component_suite))
# jcl_suite = unittest.TestSuite((component_suite,
# feeder_component_suite,
# feeder_suite,
# sender_suite))
# jcl_suite = unittest.TestSuite((component_suite))
jcl_suite = unittest.TestSuite((component_suite,
feeder_component_suite,
feeder_suite,
sender_suite))
test_support.run_suite(jcl_suite)

View File

@@ -45,9 +45,11 @@ from pyxmpp.message import Message
from pyxmpp.presence import Presence
from pyxmpp.streambase import StreamError, FatalStreamError
import jcl
from jcl.jabber.x import X
from jcl.model import account
from jcl.model.account import Account
from jcl.model.account import *
from jcl.lang import Lang
VERSION = "0.1"
@@ -108,8 +110,7 @@ class JCLComponent(Component):
self.disco_info.add_feature("jabber:iq:version")
self.disco_info.add_feature("jabber:iq:register")
self.__logger = logging.getLogger("jcl.jabber.JCLComponent")
# TODO : self.__lang = Lang(default_lang)
self.__lang = None
self.__lang = Lang() # TODO : Lang(default_lang = from_config)
self.running = False
signal.signal(signal.SIGINT, self.signal_handler)
@@ -364,23 +365,57 @@ class JCLComponent(Component):
from_jid = stanza.get_from()
base_from_jid = unicode(from_jid.bare())
name = stanza.get_to().node
## lang_class = self.__lang.get_lang_class_from_node(stanza.get_node())
lang_class = self.__lang.get_lang_class_from_node(stanza.get_node())
show = stanza.get_show()
self.__logger.debug("SHOW : " + str(show))
if name:
self.__logger.debug("TO : " + name + " " + base_from_jid)
# TODO : if to transport send back available to all user's account
# else send available presence
self.db_connect()
if not name:
accounts = self.account_class.select(\
self.account_class.q.user_jid == base_from_jid)
# TODO: Translate
accounts_length = 0
for account in accounts:
++accounts_length
self._send_presence_available(account, show, lang_class)
presence = Presence(from_jid = self.jid, \
to_jid = from_jid, \
status = \
str(accounts_length) \
+ " accounts registered.", \
show = show, \
stanza_type = "available")
self.stream.send(presence)
else:
accounts = self.account_class.select(\
self.account_class.q.user_jid == base_from_jid
and self.account_class.q.name == name)
for account in accounts:
self._send_presence_available(account, show, lang_class)
self.db_disconnect()
return 1
def handle_presence_unavailable(self, stanza):
"""Handle presence unavailability
"""
self.__logger.debug("PRESENCE_UNAVAILABLE")
## from_jid = stanza.get_from()
## base_from_jid = unicode(from_jid.bare())
# TODO : send unavailable to all user's account if target is transport
# else send unavailable back
from_jid = stanza.get_from()
base_from_jid = unicode(from_jid.bare())
if stanza.get_to() == unicode(self.jid):
self.db_connect()
for account in self.account_class.select(\
self.account_class.q.user_jid == base_from_jid):
account.status = jcl.model.account.OFFLINE
presence = Presence(from_jid = account.jid, \
to_jid = from_jid, \
stanza_type = "unavailable")
self.stream.send(presence)
self.db_disconnect()
presence = Presence(from_jid = stanza.get_to(), \
to_jid = from_jid, \
stanza_type = "unavailable")
self.stream.send(presence)
return 1
def handle_presence_subscribe(self, stanza):
@@ -451,20 +486,38 @@ class JCLComponent(Component):
###########################################################################
# Utils
###########################################################################
def _ask_password(self, from_jid, lang_class, account):
def _send_presence_available(self, account, show, lang_class):
"""Send available presence to account's user and ask for password
if necessary"""
account.default_lang_class = lang_class
old_status = account.status
if show is None:
account.status = account.ONLINE # TODO get real status = (not show)
else:
account.status = show
p = Presence(from_jid = account.jid, \
to_jid = account.user_jid, \
status = account.status_msg, \
show = show, \
stanza_type = "available")
self.stream.send(p)
if account.store_password == False \
and old_status == account.OFFLINE:
self._ask_password(account, lang_class)
def _ask_password(self, account, lang_class):
"""Send a Jabber message to ask for account password
"""
#TODO be JMC independant
if not account.waiting_password_reply \
and account.status != "offline":
account.waiting_password_reply = True
msg = Message(from_jid = account.jid, \
to_jid = from_jid, \
to_jid = account.user_jid, \
stanza_type = "normal", \
subject = u"[PASSWORD] " + \
lang_class.ask_password_subject, \
body = lang_class.ask_password_body % \
(account.host, account.login))
body = lang_class.ask_password_body)# % \
## (account.host, account.login))
self.stream.send(msg)
def get_jid(self, account):
@@ -472,17 +525,17 @@ class JCLComponent(Component):
"""
return account.name + u"@" + unicode(self.jid)
def get_reg_form(self, lang_class, account_class):
def get_reg_form(self, lang_class):
"""Return register form based on language and account class
"""
# TODO
pass
return X()
def get_reg_form_init(self, lang_class, account):
"""Return register form for an existing account (update)
"""
# TODO
pass
return X()
###########################################################################
# Virtual methods

View File

@@ -43,7 +43,6 @@ class Lang:
"""
lang = node.getLang()
if lang is None:
print "Using default lang " + self.default_lang
lang = self.default_lang
return lang

View File

@@ -27,9 +27,14 @@
__revision__ = "$Id: account.py,v 1.3 2005/09/18 20:24:07 dax Exp $"
from sqlobject.main import SQLObject
from sqlobject.col import StringCol
from sqlobject.col import StringCol, BoolCol
from sqlobject.dbconnection import ConnectionHub
from jcl.lang import Lang
OFFLINE = 0
ONLINE = 1
# create a hub to attach a per thread connection
hub = ConnectionHub()
@@ -40,6 +45,15 @@ class Account(SQLObject):
user_jid = StringCol()
name = StringCol()
jid = StringCol()
login = StringCol(default = "")
__password = StringCol(default = "")
__store_password = BoolCol(default = True)
default_lang_class = Lang.en
first_check = True
__status = OFFLINE
waiting_password_reply = False
__volatile_password = ""
def get_long_name(self):
"""Return Human readable account name"""
@@ -47,4 +61,50 @@ class Account(SQLObject):
long_name = property(get_long_name)
def get_status_msg(self):
"""Return current status"""
return self.name
status_msg = property(get_status_msg)
def get_status(self):
"""Return current Jabber status"""
return self.__status
def set_status(self, status):
"""Set current Jabber status"""
if status == OFFLINE:
self.waiting_password_reply = False
if not self.store_password:
self.password = None
else:
self.first_check = True
self.__status = status
status = property(get_status, set_status)
def set_password(self, password):
"""Set password associated to account"""
self.__password = password
def get_password(self):
"""Return password associated to account"""
return self.__password
password = property(get_password, set_password)
def set_store_password(self, store_password):
"""Set store_password attribut
determine if password is persitent or not"""
if store_password:
self.password = property(get_password, set_password)
else:
self.password = property(get_volatile_password, \
set_volatile_password)
self.__store_password = store_password
def get_store_password(self):
"""Return store_password boolean"""
return self.__store_password
store_password = property(get_store_password, set_store_password)

View File

@@ -34,6 +34,7 @@ from sqlobject.dbconnection import TheURIOpener
from pyxmpp.iq import Iq
from pyxmpp.stanza import Stanza
from pyxmpp.presence import Presence
from jcl.jabber.component import JCLComponent
from jcl.model import account
@@ -277,7 +278,7 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEquals(disco_items.get_items(), [])
def test_get_reg_form(self):
self.comp.get_reg_form(Lang.en, Account)
## self.comp.get_reg_form(Lang.en, Account)
# TODO
self.assertTrue(True)
@@ -285,7 +286,7 @@ class JCLComponent_TestCase(unittest.TestCase):
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account1 = Account(user_jid = "", name = "", jid = "")
del account.hub.threadConnection
self.comp.get_reg_form_init(Lang.en, account1)
## self.comp.get_reg_form_init(Lang.en, account1)
# TODO
self.assertTrue(True)
@@ -298,22 +299,181 @@ class JCLComponent_TestCase(unittest.TestCase):
iq_sent = self.comp.stream.sent[0]
self.assertEquals(iq_sent.get_to(), "user1@test.com")
self.assertEquals(len(iq_sent.xpath_eval("*/*")), 2)
name_node = iq_sent.xpath_eval("*/*")[0]
version_node = iq_sent.xpath_eval("*/*")[1]
self.assertEquals(name_node.content, self.comp.name)
self.assertEquals(version_node.content, self.comp.version)
name_nodes = iq_sent.xpath_eval("jiv:query/jiv:name", \
{"jiv" : "jabber:iq:version"})
self.assertEquals(len(name_nodes), 1)
self.assertEquals(name_nodes[0].content, self.comp.name)
version_nodes = iq_sent.xpath_eval("jiv:query/jiv:version", \
{"jiv" : "jabber:iq:version"})
self.assertEquals(len(version_nodes), 1)
self.assertEquals(version_nodes[0].content, self.comp.version)
def test_handle_get_register(self):
def test_handle_get_register_new(self):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
self.comp.handle_get_register(Iq(stanza_type = "get", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com"))
self.assertEquals(len(self.comp.stream.sent), 1)
iq_sent = self.comp.stream.sent[0]
# print str(iq_sent.get_node())
self.assertEquals(iq_sent.get_to(), "user1@test.com")
# self.assertEquals(len(iq_sent.xpath_eval("*/*")), 1)
# TODO
def test_handle_get_register_exist(self):
pass
def test_handle_set_register(self):
pass
def test_handle_presence_available(self):
pass
def test_handle_presence_available_to_component(self):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = Account(user_jid = "user1@test.com", \
name = "account11", \
jid = "account11@jcl.test.com")
account12 = Account(user_jid = "user1@test.com", \
name = "account12", \
jid = "account12@jcl.test.com")
account2 = Account(user_jid = "user2@test.com", \
name = "account2", \
jid = "account2@jcl.test.com")
del account.hub.threadConnection
## TODO: "online" exist ?
self.comp.handle_presence_available(Presence(\
stanza_type = "available", \
show = "online", \
from_jid = "user1@test.com",\
to_jid = "jcl.test.com"))
presence_sent = self.comp.stream.sent
self.assertEqual(len(presence_sent), 3)
self.assertEqual(len([presence \
for presence in presence_sent \
if presence.get_to_jid() == "user1@test.com"]), \
3)
self.assertEqual(len([presence \
for presence in presence_sent \
if presence.get_from_jid() == \
"jcl.test.com" \
and presence.get_show() == "online"]), \
1)
self.assertEqual(len([presence \
for presence in presence_sent \
if presence.get_from_jid() == \
"account11@jcl.test.com" \
and presence.get_show() == "online"]), \
1)
self.assertEqual(len([presence \
for presence in presence_sent \
if presence.get_from_jid() == \
"account12@jcl.test.com" \
and presence.get_show() == "online"]), \
1)
def test_handle_presence_unavailable(self):
pass
def test_handle_presence_available_to_account(self):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = Account(user_jid = "user1@test.com", \
name = "account11", \
jid = "account11@jcl.test.com")
account12 = Account(user_jid = "user1@test.com", \
name = "account12", \
jid = "account12@jcl.test.com")
account2 = Account(user_jid = "user2@test.com", \
name = "account2", \
jid = "account2@jcl.test.com")
del account.hub.threadConnection
## TODO: "online" exist ?
self.comp.handle_presence_available(Presence(\
stanza_type = "available", \
show = "online", \
from_jid = "user1@test.com",\
to_jid = "account11@jcl.test.com"))
presence_sent = self.comp.stream.sent
self.assertEqual(len(presence_sent), 1)
self.assertEqual(presence_sent[0].get_to(), "user1@test.com")
self.assertEqual(presence_sent[0].get_from(), "account11@jcl.test.com")
self.assertEqual(presence_sent[0].get_show(), "online")
def test_handle_presence_unavailable_to_component(self):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = Account(user_jid = "user1@test.com", \
name = "account11", \
jid = "account11@jcl.test.com")
account12 = Account(user_jid = "user1@test.com", \
name = "account12", \
jid = "account12@jcl.test.com")
account2 = Account(user_jid = "user2@test.com", \
name = "account2", \
jid = "account2@jcl.test.com")
del account.hub.threadConnection
self.comp.handle_presence_unavailable(Presence(\
stanza_type = "unavailable", \
from_jid = "user1@test.com",\
to_jid = "jcl.test.com"))
presence_sent = self.comp.stream.sent
self.assertEqual(len(presence_sent), 3)
self.assertEqual(len([presence \
for presence in presence_sent \
if presence.get_to_jid() == "user1@test.com"]), \
3)
self.assertEqual(\
len([presence \
for presence in presence_sent \
if presence.get_from_jid() == \
"jcl.test.com" \
and presence.xpath_eval("@type")[0].get_content() \
== "unavailable"]), \
1)
self.assertEqual(\
len([presence \
for presence in presence_sent \
if presence.get_from_jid() == \
"account11@jcl.test.com" \
and presence.xpath_eval("@type")[0].get_content() \
== "unavailable"]), \
1)
self.assertEqual(\
len([presence \
for presence in presence_sent \
if presence.get_from_jid() == \
"account12@jcl.test.com" \
and presence.xpath_eval("@type")[0].get_content() \
== "unavailable"]), \
1)
def test_handle_presence_unavailable_to_account(self):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = Account(user_jid = "user1@test.com", \
name = "account11", \
jid = "account11@jcl.test.com")
account12 = Account(user_jid = "user1@test.com", \
name = "account12", \
jid = "account12@jcl.test.com")
account2 = Account(user_jid = "user2@test.com", \
name = "account2", \
jid = "account2@jcl.test.com")
del account.hub.threadConnection
self.comp.handle_presence_unavailable(Presence(\
stanza_type = "unavailable", \
from_jid = "user1@test.com",\
to_jid = "account11@jcl.test.com"))
presence_sent = self.comp.stream.sent
self.assertEqual(len(presence_sent), 1)
self.assertEqual(presence_sent[0].get_to(), "user1@test.com")
self.assertEqual(presence_sent[0].get_from(), "account11@jcl.test.com")
self.assertEqual(\
presence_sent[0].xpath_eval("@type")[0].get_content(), \
"unavailable")
def test_handle_presence_subscribe(self):
pass