Complex Account tests

darcs-hash:20061030094313-86b55-47b0746872efdbc5d2c38e79e4859ac0c739abde.gz
This commit is contained in:
David Rousselie
2006-10-30 10:43:13 +01:00
parent dda10fdc0e
commit a675e186a2
7 changed files with 503 additions and 138 deletions

View File

@@ -50,7 +50,7 @@ if __name__ == '__main__':
feeder_suite = unittest.makeSuite(Feeder_TestCase, "test") feeder_suite = unittest.makeSuite(Feeder_TestCase, "test")
sender_suite = unittest.makeSuite(Sender_TestCase, "test") sender_suite = unittest.makeSuite(Sender_TestCase, "test")
jcl_suite = unittest.TestSuite() jcl_suite = unittest.TestSuite()
# jcl_suite.addTest(FeederComponent_TestCase('test_handle_get_register_exist2')) # jcl_suite.addTest(FeederComponent_TestCase('test_handle_get_register_exist_complex'))
# jcl_suite.addTest(FeederComponent_TestCase('test_constructor')) # jcl_suite.addTest(FeederComponent_TestCase('test_constructor'))
# jcl_suite = unittest.TestSuite((feeder_component_suite)) # jcl_suite = unittest.TestSuite((feeder_component_suite))
# jcl_suite = unittest.TestSuite((component_suite)) # jcl_suite = unittest.TestSuite((component_suite))

View File

@@ -276,12 +276,12 @@ class JCLComponent(Component, object):
disco_items = DiscoItems() disco_items = DiscoItems()
if not node: if not node:
self.db_connect() self.db_connect()
for account in self.account_class.select(Account.q.user_jid == \ for _account in self.account_class.select(Account.q.user_jid == \
base_from_jid): base_from_jid):
self.__logger.debug(str(account)) self.__logger.debug(str(_account))
DiscoItem(disco_items, \ DiscoItem(disco_items, \
JID(account.jid), \ JID(_account.jid), \
account.name, account.long_name) _account.name, _account.long_name)
self.db_disconnect() self.db_disconnect()
return disco_items return disco_items
@@ -319,6 +319,29 @@ class JCLComponent(Component, object):
self.stream.send(info_query) self.stream.send(info_query)
return 1 return 1
def remove_all_accounts(self, user_jid):
self.db_connect()
for _account in self.account_class.select(\
self.account_class.q.user_jid == user_jid):
self.__logger.debug("Deleting " + _account.name \
+ " for " + user_jid)
presence = Presence(from_jid = self.get_jid(_account), \
to_jid = user_jid, \
stanza_type = "unsubscribe")
self.stream.send(presence)
presence = Presence(from_jid = self.get_jid(_account), \
to_jid = user_jid, \
stanza_type = "unsubscribed")
self.stream.send(presence)
_account.destroySelf()
presence = Presence(from_jid = self.jid, to_jid = user_jid, \
stanza_type = "unsubscribe")
self.stream.send(presence)
presence = Presence(from_jid = self.jid, to_jid = user_jid, \
stanza_type = "unsubscribed")
self.stream.send(presence)
self.db_disconnect()
def handle_set_register(self, info_query): def handle_set_register(self, info_query):
"""Handle user registration response """Handle user registration response
""" """
@@ -330,35 +353,19 @@ class JCLComponent(Component, object):
remove = info_query.xpath_eval("r:query/r:remove", \ remove = info_query.xpath_eval("r:query/r:remove", \
{"r" : "jabber:iq:register"}) {"r" : "jabber:iq:register"})
if remove: if remove:
# for name in self.__storage.keys((base_from_jid,)): self.remove_all_accounts(base_from_jid)
# self.__logger.debug("Deleting " + name \
# + " for " + base_from_jid)
# presence = Presence(from_jid = name + "@" + unicode(self.jid), \
# to_jid = from_jid, \
# stanza_type = "unsubscribe")
# self.stream.send(presence)
# presence = Presence(from_jid = name + "@" + unicode(self.jid), \
# to_jid = from_jid, \
# stanza_type = "unsubscribed")
# self.stream.send(presence)
# del self.__storage[(base_from_jid, name)]
presence = Presence(from_jid = self.jid, to_jid = from_jid, \
stanza_type = "unsubscribe")
self.stream.send(presence)
presence = Presence(from_jid = self.jid, to_jid = from_jid, \
stanza_type = "unsubscribed")
self.stream.send(presence)
return 1 return 1
query = info_query.get_query() query = info_query.get_query()
x_data = X() x_data = X()
x_data.from_xml(query.children) x_data.from_xml(query.children)
# TODO : get info from Xdata
# "name" must not be null
name = x_data.get_field_value("name") name = x_data.get_field_value("name")
if name is None: if name is None:
# TODO make error # TODO : find correct error for mandatory field
print "ERROR" info_query = info_query.make_error_response(\
"resource-constraint")
self.stream.send(info_query)
return
self.db_connect() self.db_connect()
accounts = self.account_class.select(\ accounts = self.account_class.select(\
self.account_class.q.user_jid == base_from_jid \ self.account_class.q.user_jid == base_from_jid \
@@ -368,40 +375,51 @@ class JCLComponent(Component, object):
self.account_class.q.user_jid == base_from_jid) self.account_class.q.user_jid == base_from_jid)
all_accounts_count = all_accounts.count() all_accounts_count = all_accounts.count()
if accounts_count > 1: if accounts_count > 1:
# TODO make error # Just print a warning, only the first account will be use
print "ERROR" print >>sys.stderr, "There might not exist 2 accounts for " + \
if accounts_count == 1: base_from_jid + " and named " + name
account = list(accounts)[0] if accounts_count >= 1:
_account = list(accounts)[0]
else: else:
account = self.account_class(user_jid = base_from_jid, \ _account = self.account_class(user_jid = base_from_jid, \
name = name, \ name = name, \
jid = name + u"@"+unicode(self.jid)) jid = name + u"@"+unicode(self.jid))
try:
for (field, field_type, field_post_func, field_default_func) in \ for (field, field_type, field_post_func, field_default_func) in \
self.account_class.get_register_fields(): self.account_class.get_register_fields():
setattr(account, x_data.get_field_value(field, \ setattr(_account, field, \
x_data.get_field_value(field, \
field_post_func, \ field_post_func, \
field_default_func)) field_default_func))
except FieldError, e:
_account.destroySelf()
# TODO: get correct error from e
info_query = info_query.make_error_response("resource-constraint")
self.stream.send(info_query)
self.db_disconnect()
return
info_query = info_query.make_result_response() info_query = info_query.make_result_response()
self.stream.send(info_query) self.stream.send(info_query)
if all_accounts_count == 0: if all_accounts_count == 0:
self.stream.send(Presence(from_jid = self.jid, to_jid = base_from_jid, \ self.stream.send(Presence(from_jid = self.jid, \
to_jid = base_from_jid, \
stanza_type = "subscribe")) stanza_type = "subscribe"))
if accounts_count == 0: if accounts_count == 0:
self.stream.send(Message(\ self.stream.send(Message(\
from_jid = self.jid, to_jid = from_jid, \ from_jid = self.jid, to_jid = from_jid, \
stanza_type = "normal", \ stanza_type = "normal", \
subject = account.get_new_message_subject(lang_class), \ subject = _account.get_new_message_subject(lang_class), \
body = account.get_new_message_body(lang_class))) body = _account.get_new_message_body(lang_class)))
self.stream.send(Presence(from_jid = self.get_jid(account), \ self.stream.send(Presence(from_jid = self.get_jid(_account), \
to_jid = base_from_jid, \ to_jid = base_from_jid, \
stanza_type = "subscribe")) stanza_type = "subscribe"))
else: else:
self.stream.send(Message(\ self.stream.send(Message(\
from_jid = self.jid, to_jid = from_jid, \ from_jid = self.jid, to_jid = from_jid, \
stanza_type = "normal", \ stanza_type = "normal", \
subject = account.get_update_message_subject(lang_class), \ subject = _account.get_update_message_subject(lang_class), \
body = account.get_update_message_body(lang_class))) body = _account.get_update_message_body(lang_class)))
self.db_disconnect() self.db_disconnect()
def handle_presence_available(self, stanza): def handle_presence_available(self, stanza):
@@ -422,9 +440,9 @@ class JCLComponent(Component, object):
self.account_class.q.user_jid == base_from_jid) self.account_class.q.user_jid == base_from_jid)
# TODO: Translate # TODO: Translate
accounts_length = 0 accounts_length = 0
for account in accounts: for _account in accounts:
++accounts_length ++accounts_length
self._send_presence_available(account, show, lang_class) self._send_presence_available(_account, show, lang_class)
presence = Presence(from_jid = self.jid, \ presence = Presence(from_jid = self.jid, \
to_jid = from_jid, \ to_jid = from_jid, \
status = \ status = \
@@ -437,8 +455,8 @@ class JCLComponent(Component, object):
accounts = self.account_class.select(\ accounts = self.account_class.select(\
self.account_class.q.user_jid == base_from_jid self.account_class.q.user_jid == base_from_jid
and self.account_class.q.name == name) and self.account_class.q.name == name)
for account in accounts: if accounts.count() >= 1:
self._send_presence_available(account, show, lang_class) self._send_presence_available(_account[0], show, lang_class)
self.db_disconnect() self.db_disconnect()
return 1 return 1
@@ -476,9 +494,15 @@ class JCLComponent(Component, object):
"""Handle subscribed presence from user """Handle subscribed presence from user
""" """
self.__logger.debug("PRESENCE_SUBSCRIBED") self.__logger.debug("PRESENCE_SUBSCRIBED")
## name = stanza.get_to().node name = stanza.get_to().node
## from_jid = stanza.get_from() from_jid = stanza.get_from()
## base_from_jid = unicode(from_jid.bare()) base_from_jid = unicode(from_jid.bare())
# accounts = self.account_class.select(\
# self.account_class.q.user_jid == base_from_jid
# and self.account_class.q.name == name)
# if accounts.count() >= 1:
# _account = list(accounts)[0]
# self._send_presence_available(_account, "show", lang_class)
# TODO : send presence available to subscribed user # TODO : send presence available to subscribed user
# is it necessary to send available presence ? # is it necessary to send available presence ?
return 1 return 1
@@ -583,10 +607,10 @@ class JCLComponent(Component, object):
## (account.host, account.login)) ## (account.host, account.login))
self.stream.send(msg) self.stream.send(msg)
def get_jid(self, account): def get_jid(self, _account):
"""Return account jid based on account instance and component jid """Return account jid based on account instance and component jid
""" """
return account.name + u"@" + unicode(self.jid) return _account.name + u"@" + unicode(self.jid)
def get_reg_form(self, lang_class): def get_reg_form(self, lang_class):
"""Return register form based on language and account class """Return register form based on language and account class
@@ -625,7 +649,7 @@ class JCLComponent(Component, object):
reg_form.fields["name"].type = "hidden" reg_form.fields["name"].type = "hidden"
for (field_name, field) in reg_form.fields.items(): for (field_name, field) in reg_form.fields.items():
if hasattr(self.account_class, field_name): if hasattr(self.account_class, field_name):
field.value = getattr(account, field_name) field.value = str(getattr(account, field_name))
return reg_form return reg_form
########################################################################### ###########################################################################

View File

@@ -49,12 +49,13 @@ class Option(object):
class Field(object): class Field(object):
"""Jabber Xdata form Field """Jabber Xdata form Field
""" """
def __init__(self, field_type, label, var, value): def __init__(self, field_type, label, var, value, required = False):
self.type = field_type self.type = field_type
self.__label = label self.__label = label
self.__var = var self.__var = var
self.value = value self.value = value
self.__options = [] self.__options = []
self.required = required
def add_option(self, label, value): def add_option(self, label, value):
"""Add an Option to this field """Add an Option to this field
@@ -78,6 +79,8 @@ class Field(object):
field.setProp("var", self.__var) field.setProp("var", self.__var)
if self.value: if self.value:
field.newChild(None, "value", self.value) field.newChild(None, "value", self.value)
if self.required:
field.newChild(None, "required", None)
for option in self.__options: for option in self.__options:
option.get_xml(field) option.get_xml(field)
return field return field
@@ -108,7 +111,7 @@ class X(object):
def get_field_value(self, field_name, \ def get_field_value(self, field_name, \
post_func = (lambda value: value), \ post_func = (lambda value: value), \
defaut_func = (lambda: None)): default_func = (lambda field_name: None)):
"""Return field value processed by post_func """Return field value processed by post_func
or return default func processing if field does not exist""" or return default func processing if field does not exist"""
if self.fields.has_key(field_name): if self.fields.has_key(field_name):

View File

@@ -36,10 +36,32 @@ from jcl.jabber.error import FieldError
OFFLINE = "offline" OFFLINE = "offline"
ONLINE = "online" ONLINE = "online"
def default_post_func(field_value):
"""Default post process function: do nothing"""
return field_value
def boolean_post_func(field_value):
"""Return a boolean from boolean field value"""
return (field_value == "1" or field_value.lower() == "true")
def int_post_func(field_value):
"""Return an integer from integer field value"""
return int(field_value)
def string_not_null_post_func(field_value):
"""Post process function for not null/empty string"""
if field_value is None or field_value == "":
raise FieldError # TODO : add translated message
return field_value
def mandatory_field(field_name):
"""Used as default function for field that must be specified
and cannot have default value"""
raise FieldError # TODO : add translated message
# create a hub to attach a per thread connection # create a hub to attach a per thread connection
hub = ConnectionHub() hub = ConnectionHub()
class Account2(SQLObject):
pass
class Account(SQLObject): class Account(SQLObject):
"""Base Account class""" """Base Account class"""
@@ -122,26 +144,3 @@ class Account(SQLObject):
def get_update_message_body(self, lang_class): def get_update_message_body(self, lang_class):
"""Return localized message body for existing account""" """Return localized message body for existing account"""
return lang_class.new_account_message_body return lang_class.new_account_message_body
def default_post_func(self, field_value):
"""Default post process function: do nothing"""
return field_value
def boolean_post_func(self, field_value):
"""Return a boolean from boolean field value"""
return (field_value == "1" or field_value.lower() == "true")
def int_post_func(self, field_value):
"""Return an integer from integer field value"""
return int(field_value)
def string_not_null_post_func(self, field_value):
"""Post process function for not null/empty string"""
if field_value is None or field_value == "":
raise FieldError # TODO : add translated message
return field_value
def mandatory_field(self, field_name):
"""Used as default function for field that must be specified
and cannot have default value"""
raise FieldError # TODO : add translated message

View File

@@ -325,13 +325,12 @@ class JCLComponent_TestCase(unittest.TestCase):
fields = iq_sent.xpath_eval("jir:query/jxd:x/jxd:field", \ fields = iq_sent.xpath_eval("jir:query/jxd:x/jxd:field", \
{"jir" : "jabber:iq:register", \ {"jir" : "jabber:iq:register", \
"jxd" : "jabber:x:data"}) "jxd" : "jabber:x:data"})
print str([str(field) for field in fields])
self.assertEquals(len(fields), 1) self.assertEquals(len(fields), 1)
self.assertEquals(fields[0].prop("type"), "text-single") self.assertEquals(fields[0].prop("type"), "text-single")
self.assertEquals(fields[0].prop("var"), "name") self.assertEquals(fields[0].prop("var"), "name")
self.assertEquals(fields[0].prop("label"), Lang.en.account_name) self.assertEquals(fields[0].prop("label"), Lang.en.account_name)
def test_handle_get_register_new2(self): def test_handle_get_register_new_complex(self):
self.comp.account_class = AccountExample self.comp.account_class = AccountExample
self.comp.stream = MockStream() self.comp.stream = MockStream()
self.comp.stream_class = MockStream self.comp.stream_class = MockStream
@@ -424,49 +423,78 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEquals(len(value), 1) self.assertEquals(len(value), 1)
self.assertEquals(value[0].content, "account1") self.assertEquals(value[0].content, "account1")
# def test_handle_get_register_exist2(self): def test_handle_get_register_exist_complex(self):
# self.comp.account_class = AccountExample self.comp.account_class = AccountExample
# self.comp.stream = MockStream() self.comp.stream = MockStream()
# self.comp.stream_class = MockStream self.comp.stream_class = MockStream
# account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
# account1 = AccountExample(user_jid = "user1@test.com", \ account1 = AccountExample(user_jid = "user1@test.com", \
# name = "account1", \ name = "account1", \
# jid = "account1@jcl.test.com") jid = "account1@jcl.test.com", \
# del account.hub.threadConnection login = "mylogin", \
# self.comp.handle_get_register(Iq(stanza_type = "get", \ password = "mypassword", \
# from_jid = "user1@test.com", \ store_password = False, \
# to_jid = "account1@jcl.test.com")) test_enum = "choice3", \
# self.assertEquals(len(self.comp.stream.sent), 1) test_int = 21)
# iq_sent = self.comp.stream.sent[0] del account.hub.threadConnection
# self.assertEquals(iq_sent.get_to(), "user1@test.com") self.comp.handle_get_register(Iq(stanza_type = "get", \
# titles = iq_sent.xpath_eval("jir:query/jxd:x/jxd:title", \ from_jid = "user1@test.com", \
# {"jir" : "jabber:iq:register", \ to_jid = "account1@jcl.test.com"))
# "jxd" : "jabber:x:data"}) self.assertEquals(len(self.comp.stream.sent), 1)
# self.assertEquals(len(titles), 1) iq_sent = self.comp.stream.sent[0]
# self.assertEquals(titles[0].content, \ self.assertEquals(iq_sent.get_to(), "user1@test.com")
# Lang.en.register_title) titles = iq_sent.xpath_eval("jir:query/jxd:x/jxd:title", \
# instructions = iq_sent.xpath_eval("jir:query/jxd:x/jxd:instructions", \ {"jir" : "jabber:iq:register", \
# {"jir" : "jabber:iq:register", \ "jxd" : "jabber:x:data"})
# "jxd" : "jabber:x:data"}) self.assertEquals(len(titles), 1)
# self.assertEquals(len(instructions), 1) self.assertEquals(titles[0].content, \
# self.assertEquals(instructions[0].content, \ Lang.en.register_title)
# Lang.en.register_instructions) instructions = iq_sent.xpath_eval("jir:query/jxd:x/jxd:instructions", \
# fields = iq_sent.xpath_eval("jir:query/jxd:x/jxd:field", \ {"jir" : "jabber:iq:register", \
# {"jir" : "jabber:iq:register", \ "jxd" : "jabber:x:data"})
# "jxd" : "jabber:x:data"}) self.assertEquals(len(instructions), 1)
# self.assertEquals(len(fields), 1) self.assertEquals(instructions[0].content, \
# self.assertEquals(len([field Lang.en.register_instructions)
# for field in fields \ fields = iq_sent.xpath_eval("jir:query/jxd:x/jxd:field", \
# if field.prop("type") == "hidden" \ {"jir" : "jabber:iq:register", \
# and field.prop("var") == "name" \ "jxd" : "jabber:x:data"})
# and field.prop("label") == \ self.assertEquals(len(fields), 6)
# Lang.en.account_name]), \ field = fields[0]
# 1) self.assertEquals(field.prop("type"), "hidden")
# value = iq_sent.xpath_eval("jir:query/jxd:x/jxd:field/jxd:value", \ self.assertEquals(field.prop("var"), "name")
# {"jir" : "jabber:iq:register", \ self.assertEquals(field.prop("label"), Lang.en.account_name)
# "jxd" : "jabber:x:data"}) self.assertEquals(field.children.name, "value")
# self.assertEquals(len(value), 1) self.assertEquals(field.children.content, "account1")
# self.assertEquals(value[0].content, "account1") field = fields[1]
self.assertEquals(field.prop("type"), "text-single")
self.assertEquals(field.prop("var"), "login")
self.assertEquals(field.prop("label"), "login")
self.assertEquals(field.children.name, "value")
self.assertEquals(field.children.content, "mylogin")
field = fields[2]
self.assertEquals(field.prop("type"), "text-private")
self.assertEquals(field.prop("var"), "password")
self.assertEquals(field.prop("label"), "password")
self.assertEquals(field.children.name, "value")
self.assertEquals(field.children.content, "mypassword")
field = fields[3]
self.assertEquals(field.prop("type"), "boolean")
self.assertEquals(field.prop("var"), "store_password")
self.assertEquals(field.prop("label"), "store_password")
self.assertEquals(field.children.name, "value")
self.assertEquals(field.children.content, "False")
field = fields[4]
self.assertEquals(field.prop("type"), "list-single")
self.assertEquals(field.prop("var"), "test_enum")
self.assertEquals(field.prop("label"), "test_enum")
self.assertEquals(field.children.name, "value")
self.assertEquals(field.children.content, "choice3")
field = fields[5]
self.assertEquals(field.prop("type"), "text-single")
self.assertEquals(field.prop("var"), "test_int")
self.assertEquals(field.prop("label"), "test_int")
self.assertEquals(field.children.name, "value")
self.assertEquals(field.children.content, "21")
def test_handle_set_register_new(self): def test_handle_set_register_new(self):
self.comp.stream = MockStream() self.comp.stream = MockStream()
@@ -526,9 +554,8 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEquals(presence_account.get_node().prop("type"), \ self.assertEquals(presence_account.get_node().prop("type"), \
"subscribe") "subscribe")
def test_handle_set_register_new2(self): def test_handle_set_register_new_complex(self):
# TODO : Add AccountExample fields self.comp.account_class = AccountExample
# self.comp.account_class = AccountExample
self.comp.stream = MockStream() self.comp.stream = MockStream()
self.comp.stream_class = MockStream self.comp.stream_class = MockStream
x_data = X() x_data = X()
@@ -537,6 +564,21 @@ class JCLComponent_TestCase(unittest.TestCase):
x_data.add_field(field_type = "text-single", \ x_data.add_field(field_type = "text-single", \
var = "name", \ var = "name", \
value = "account1") value = "account1")
x_data.add_field(field_type = "text-single", \
var = "login", \
value = "mylogin")
x_data.add_field(field_type = "text-private", \
var = "password", \
value = "mypassword")
x_data.add_field(field_type = "boolean", \
var = "store_password", \
value = "false")
x_data.add_field(field_type = "list-single", \
var = "test_enum", \
value = "choice3")
x_data.add_field(field_type = "text-single", \
var = "test_int", \
value = "43")
iq_set = Iq(stanza_type = "set", \ iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \ from_jid = "user1@test.com", \
to_jid = "jcl.test.com") to_jid = "jcl.test.com")
@@ -553,6 +595,11 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEquals(_account.user_jid, "user1@test.com") self.assertEquals(_account.user_jid, "user1@test.com")
self.assertEquals(_account.name, "account1") self.assertEquals(_account.name, "account1")
self.assertEquals(_account.jid, "account1@jcl.test.com") self.assertEquals(_account.jid, "account1@jcl.test.com")
self.assertEquals(_account.login, "mylogin")
self.assertEquals(_account.password, "mypassword")
self.assertFalse(_account.store_password)
self.assertEquals(_account.test_enum, "choice3")
self.assertEquals(_account.test_int, 43)
del account.hub.threadConnection del account.hub.threadConnection
stanza_sent = self.comp.stream.sent stanza_sent = self.comp.stream.sent
@@ -586,11 +633,246 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEquals(presence_account.get_node().prop("type"), \ self.assertEquals(presence_account.get_node().prop("type"), \
"subscribe") "subscribe")
def test_handle_set_register_update(self): def test_handle_set_register_new_default_values(self):
pass self.comp.account_class = AccountExample
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
x_data = X()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data.add_field(field_type = "text-single", \
var = "name", \
value = "account1")
x_data.add_field(field_type = "text-single", \
var = "login", \
value = "mylogin")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
accounts = self.comp.account_class.select(\
self.comp.account_class.q.user_jid == "user1@test.com" \
and self.comp.account_class.q.name == "account1")
self.assertEquals(accounts.count(), 1)
_account = accounts[0]
self.assertEquals(_account.user_jid, "user1@test.com")
self.assertEquals(_account.name, "account1")
self.assertEquals(_account.jid, "account1@jcl.test.com")
self.assertEquals(_account.login, "mylogin")
self.assertEquals(_account.password, None)
self.assertTrue(_account.store_password)
self.assertEquals(_account.test_enum, "choice2")
self.assertEquals(_account.test_int, 44)
del account.hub.threadConnection
def test_handle_set_register_new_name_mandatory(self):
self.comp.account_class = AccountExample
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
x_data = X()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
accounts = self.comp.account_class.select(\
self.comp.account_class.q.user_jid == "user1@test.com" \
and self.comp.account_class.q.name == "account1")
self.assertEquals(accounts.count(), 0)
del account.hub.threadConnection
stanza_sent = self.comp.stream.sent
self.assertEquals(len(stanza_sent), 1)
self.assertTrue(isinstance(stanza_sent[0], Iq))
# TODO : add more assertions
def test_handle_set_register_new_field_mandatory(self):
self.comp.account_class = AccountExample
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
x_data = X()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data.add_field(field_type = "text-single", \
var = "name", \
value = "account1")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
accounts = self.comp.account_class.select(\
self.comp.account_class.q.user_jid == "user1@test.com" \
and self.comp.account_class.q.name == "account1")
self.assertEquals(accounts.count(), 0)
del account.hub.threadConnection
stanza_sent = self.comp.stream.sent
self.assertEquals(len(stanza_sent), 1)
self.assertTrue(isinstance(stanza_sent[0], Iq))
# TODO : add more assertions
def test_handle_set_register_update_complex(self):
self.comp.account_class = AccountExample
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
existing_account = AccountExample(user_jid = "user1@test.com", \
name = "account1", \
jid = "account1@jcl.test.com", \
login = "mylogin", \
password = "mypassword", \
store_password = True, \
test_enum = "choice1", \
test_int = 21)
another_account = AccountExample(user_jid = "user1@test.com", \
name = "account2", \
jid = "account2@jcl.test.com", \
login = "mylogin", \
password = "mypassword", \
store_password = True, \
test_enum = "choice1", \
test_int = 21)
del account.hub.threadConnection
x_data = X()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data.add_field(field_type = "text-single", \
var = "name", \
value = "account1")
x_data.add_field(field_type = "text-single", \
var = "login", \
value = "mylogin2")
x_data.add_field(field_type = "text-private", \
var = "password", \
value = "mypassword2")
x_data.add_field(field_type = "boolean", \
var = "store_password", \
value = "false")
x_data.add_field(field_type = "list-single", \
var = "test_enum", \
value = "choice3")
x_data.add_field(field_type = "text-single", \
var = "test_int", \
value = "43")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
accounts = self.comp.account_class.select(\
self.comp.account_class.q.user_jid == "user1@test.com" \
and self.comp.account_class.q.name == "account1")
self.assertEquals(accounts.count(), 1)
_account = accounts[0]
self.assertEquals(_account.user_jid, "user1@test.com")
self.assertEquals(_account.name, "account1")
self.assertEquals(_account.jid, "account1@jcl.test.com")
self.assertEquals(_account.login, "mylogin2")
self.assertEquals(_account.password, "mypassword2")
self.assertFalse(_account.store_password)
self.assertEquals(_account.test_enum, "choice3")
self.assertEquals(_account.test_int, 43)
del account.hub.threadConnection
stanza_sent = self.comp.stream.sent
self.assertEquals(len(stanza_sent), 2)
iq_result = stanza_sent[0]
self.assertTrue(isinstance(iq_result, Iq))
self.assertEquals(iq_result.get_node().prop("type"), "result")
self.assertEquals(iq_result.get_from(), "jcl.test.com")
self.assertEquals(iq_result.get_to(), "user1@test.com")
message = stanza_sent[1]
self.assertTrue(isinstance(message, Message))
self.assertEquals(message.get_from(), "jcl.test.com")
self.assertEquals(message.get_to(), "user1@test.com")
self.assertEquals(message.get_subject(), \
_account.get_update_message_subject(Lang.en))
self.assertEquals(message.get_body(), \
_account.get_update_message_body(Lang.en))
def test_handle_set_register_remove(self): def test_handle_set_register_remove(self):
pass self.comp.stream = MockStream()
self.comp.stream_class = MockStream
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
account11 = Account(user_jid = "user1@test.com", \
name = "account1", \
jid = "account1@jcl.test.com")
account12 = Account(user_jid = "user1@test.com", \
name = "account2", \
jid = "account2@jcl.test.com")
account21 = Account(user_jid = "user2@test.com", \
name = "account1", \
jid = "account1@jcl.test.com")
del account.hub.threadConnection
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
query.newChild(None, "remove", None)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
accounts = self.comp.account_class.select(\
self.comp.account_class.q.user_jid == "user1@test.com")
self.assertEquals(accounts.count(), 0)
accounts = self.comp.account_class.select(\
self.comp.account_class.q.user_jid == "user2@test.com")
self.assertEquals(accounts.count(), 1)
_account = accounts[0]
self.assertEquals(_account.user_jid, "user2@test.com")
self.assertEquals(_account.name, "account1")
self.assertEquals(_account.jid, "account1@jcl.test.com")
del account.hub.threadConnection
stanza_sent = self.comp.stream.sent
self.assertEquals(len(stanza_sent), 6)
presence = stanza_sent[0]
self.assertTrue(isinstance(presence, Presence))
self.assertEquals(presence.get_node().prop("type"), "unsubscribe")
self.assertEquals(presence.get_from(), "account1@jcl.test.com")
self.assertEquals(presence.get_to(), "user1@test.com")
presence = stanza_sent[1]
self.assertTrue(isinstance(presence, Presence))
self.assertEquals(presence.get_node().prop("type"), "unsubscribed")
self.assertEquals(presence.get_from(), "account1@jcl.test.com")
self.assertEquals(presence.get_to(), "user1@test.com")
presence = stanza_sent[2]
self.assertTrue(isinstance(presence, Presence))
self.assertEquals(presence.get_node().prop("type"), "unsubscribe")
self.assertEquals(presence.get_from(), "account2@jcl.test.com")
self.assertEquals(presence.get_to(), "user1@test.com")
presence = stanza_sent[3]
self.assertTrue(isinstance(presence, Presence))
self.assertEquals(presence.get_node().prop("type"), "unsubscribed")
self.assertEquals(presence.get_from(), "account2@jcl.test.com")
self.assertEquals(presence.get_to(), "user1@test.com")
presence = stanza_sent[4]
self.assertTrue(isinstance(presence, Presence))
self.assertEquals(presence.get_node().prop("type"), "unsubscribe")
self.assertEquals(presence.get_from(), "jcl.test.com")
self.assertEquals(presence.get_to(), "user1@test.com")
presence = stanza_sent[5]
self.assertTrue(isinstance(presence, Presence))
self.assertEquals(presence.get_node().prop("type"), "unsubscribed")
self.assertEquals(presence.get_from(), "jcl.test.com")
self.assertEquals(presence.get_to(), "user1@test.com")
def test_handle_presence_available_to_component(self): def test_handle_presence_available_to_component(self):
self.comp.stream = MockStream() self.comp.stream = MockStream()
@@ -686,7 +968,7 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEqual(presence.get_from_jid(), "account11@jcl.test.com") self.assertEqual(presence.get_from_jid(), "account11@jcl.test.com")
self.assertEqual(presence.get_to_jid(), "user1@test.com") self.assertEqual(presence.get_to_jid(), "user1@test.com")
def test_handle_presence_available_to_account_live_password2(self): def test_handle_presence_available_to_account_live_password_complex(self):
self.comp.account_class = AccountExample self.comp.account_class = AccountExample
self.comp.stream = MockStream() self.comp.stream = MockStream()
self.comp.stream_class = MockStream self.comp.stream_class = MockStream
@@ -923,7 +1205,7 @@ class JCLComponent_TestCase(unittest.TestCase):
messages_sent = self.comp.stream.sent messages_sent = self.comp.stream.sent
self.assertEqual(len(messages_sent), 0) self.assertEqual(len(messages_sent), 0)
def test_handle_message_password2(self): def test_handle_message_password_complex(self):
self.comp.account_class = AccountExample self.comp.account_class = AccountExample
self.comp.stream = MockStream() self.comp.stream = MockStream()
self.comp.stream_class = MockStream self.comp.stream_class = MockStream

View File

View File

@@ -0,0 +1,57 @@
##
## account.py
## Login : David Rousselie <dax@happycoders.org>
## Started on Sat Oct 28 17:03:30 2006 David Rousselie
## $Id$
##
## Copyright (C) 2006 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
from sqlobject.main import SQLObject
from sqlobject.col import StringCol, BoolCol, EnumCol, IntCol
from jcl.lang import Lang
from jcl.model import account
from jcl.model.account import Account
class AccountExample(Account):
login = StringCol(default = "")
password = StringCol(default = None)
store_password = BoolCol(default = True)
waiting_password_reply = BoolCol(default = False)
test_enum = EnumCol(default = "choice1", enumValues = ["choice1", "choice2", "choice3"])
test_int = IntCol(default = 42)
def _get_register_fields(cls):
def password_post_func(password):
if password is None or password == "":
return None
return password
return [("login", "text-single", account.string_not_null_post_func, \
account.mandatory_field), \
("password", "text-private", password_post_func, \
(lambda field_name: None)), \
("store_password", "boolean", account.boolean_post_func, \
lambda field_name: True), \
("test_enum", "list-single",account.string_not_null_post_func,\
lambda field_name: "choice2"), \
("test_int", "text-single", account.int_post_func, \
lambda field_name: 44)]
get_register_fields = classmethod(_get_register_fields)