add-user ad-hoc command implementation

darcs-hash:20070723204429-86b55-90311a0b489384658b26da23aeca15a5af6d6b32.gz
This commit is contained in:
David Rousselie
2007-07-23 22:44:29 +02:00
parent f50516181b
commit 8599ac5bf9
13 changed files with 1160 additions and 1019 deletions

View File

@@ -26,5 +26,11 @@ __revision__ = "$Id: error.py,v 1.1 2006/11/05 20:13:48 dax Exp $"
class FieldError(Exception):
"""Error raised when error exists on Jabber Data Form fields"""
pass
def __init__(self, field, error_msg):
Exception.__init__(self)
self.field = field
self.error_msg = error_msg
def __str__(self):
return "Error with " + str(self.field) + " field: " + str(self.error_msg)

View File

@@ -16,7 +16,7 @@ class Handler(object):
Filter account to be processed by the handler
return all accounts. DB connection might already be opened.
"""
accounts = Account.select()
accounts = account.get_all_accounts()
return accounts
def handle(self, stanza, lang_class, data):
@@ -52,8 +52,8 @@ def get_account_filter(self, stanza, lang_class, node=None):
"""Filter stanzas sent to account jid, only if account exists"""
name = stanza.get_to().node
if name is not None:
return account.get_account(name,
stanza.get_from().bare())
return account.get_account(stanza.get_from().bare(),
name)
else:
return None

View File

@@ -21,18 +21,27 @@
##
import re
import datetime
import logging
from pyxmpp.jid import JID
from pyxmpp.jabber.disco import DiscoInfo, DiscoItems, DiscoItem, DiscoIdentity
from pyxmpp.jabber.dataforms import Form, Field
import jcl
from jcl.lang import Lang
from jcl.jabber.disco import DiscoHandler, RootDiscoGetInfoHandler
from jcl.model import account
from jcl.model.account import Account
COMMAND_NS = "http://jabber.org/protocol/commands"
ACTION_COMPLETE = "complete"
ACTION_NEXT = "next"
ACTION_PREVIOUS = "prev"
ACTION_CANCEL = "cancel"
STATUS_COMPLETED = "completed"
STATUS_EXECUTING = "executing"
STATUS_CANCELLED = "cancel"
class FieldNoType(Field):
def complete_xml_element(self, xmlnode, doc):
result = Field.complete_xml_element(self, xmlnode, doc)
@@ -44,10 +53,12 @@ class CommandManager(object):
def __init__(self, component=None, account_manager=None):
"""CommandManager constructor"""
self.__logger = logging.getLogger("jcl.jabber.command.CommandManager")
self.component = component
self.account_manager = account_manager
self.commands = []
self.command_re = re.compile("([^#]*#)?(.*)")
self.sessions = {}
def get_short_command_name(self, command_name):
"""
@@ -55,7 +66,7 @@ class CommandManager(object):
'http://jabber.org/protocol/admin#add-user' -> 'add-user'
"""
match = self.command_re.match(command_name)
return match.group(2)
return match.group(2).replace('-', '_')
def get_command_desc(self, command_name, lang_class):
"""Return localized command description"""
@@ -96,6 +107,86 @@ class CommandManager(object):
else:
return [info_query.make_error_response("feature-not-implemented")]
def generate_session_id(self, node):
return self.get_short_command_name(node) + ":" + \
datetime.datetime.now().isoformat()
def _create_response(self, info_query, completed=True):
xml_command = info_query.xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
node = xml_command.prop("node")
if xml_command.hasProp("sessionid"):
session_id = xml_command.prop("sessionid")
else:
session_id = self.generate_session_id(node)
self.__logger.debug("Creating command execution with session ID " + str(session_id))
response = info_query.make_result_response()
command_node = response.set_new_content(COMMAND_NS, "command")
command_node.setProp("node", node)
if completed:
command_node.setProp("status", STATUS_COMPLETED)
else:
command_node.setProp("status", STATUS_EXECUTING)
command_node.setProp("sessionid",
session_id)
return (response, command_node, session_id)
def parse_form(self, info_query, session_id):
fields = info_query.xpath_eval(\
"c:command/data:x/data:field",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
for field in fields:
field_name = field.prop("var")
value = info_query.xpath_eval(\
"c:command/data:x/data:field[@var='"
+ field_name + "']/data:value",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
if len(value) > 0:
self.__logger.debug("Adding to session '" + session_id
+ "': " + field_name + "="
+ value[0].content)
self.sessions[session_id][1][field_name] = value[0].content
def execute_multi_step_command(self, info_query, short_node, update_step_func):
(response,
command_node,
session_id) = self._create_response(info_query,
False)
lang_class = self.component.lang.get_lang_class_from_node(info_query.get_node())
if not self.sessions.has_key(session_id):
self.sessions[session_id] = (1, {})
else:
self.sessions[session_id] = update_step_func(session_id)
step = self.sessions[session_id][0]
step_method = "execute_" + short_node + "_" + str(step)
self.parse_form(info_query, session_id)
return [response] + getattr(self, step_method)(info_query,
self.sessions[session_id][1],
command_node,
lang_class)
def add_actions(self, command_node, actions, default_action_idx=0):
actions_node = command_node.newTextChild(None, "actions", None)
actions_node.setProp("execute", actions[default_action_idx])
for action in actions:
actions_node.newTextChild(None, action, None)
return actions_node
def cancel_command(self, info_query):
xml_command = info_query.xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
if xml_command.hasProp("sessionid"):
session_id = xml_command.prop("sessionid")
response = info_query.make_result_response()
command_node = response.set_new_content(COMMAND_NS, "command")
command_node.setProp("node", xml_command.prop("node"))
command_node.setProp("status", STATUS_CANCELLED)
command_node.setProp("sessionid",
session_id)
del self.sessions[session_id]
command_manager = CommandManager()
class JCLCommandManager(CommandManager):
@@ -104,6 +195,7 @@ class JCLCommandManager(CommandManager):
def __init__(self, component=None, account_manager=None):
"""JCLCommandManager constructor"""
CommandManager.__init__(self, component, account_manager)
self.__logger = logging.getLogger("jcl.jabber.command.JCLCommandManager")
self.commands.extend(["list",
"http://jabber.org/protocol/admin#add-user",
"http://jabber.org/protocol/admin#delete-user",
@@ -143,24 +235,192 @@ class JCLCommandManager(CommandManager):
def execute_list(self, info_query):
"""Execute command 'list'. List accounts"""
response = info_query.make_result_response()
command_node = response.set_new_content(COMMAND_NS, "command")
command_node.setProp("node", "list")
command_node.setProp("status", "completed")
#command_node.setProp("sessionid", "????") # TODO
self.__logger.debug("Executing 'list' command")
(response, command_node, session_id) = self._create_response(info_query,
"list")
result_form = Form(xmlnode_or_type="result",
title="Registered account") # TODO : add to Lang
result_form.reported_fields.append(FieldNoType(name="name",
label="Account name")) # TODO: add to Lang
bare_from_jid = unicode(info_query.get_from().bare())
for _account in account.get_accounts(bare_from_jid):
print "Adding account : " + str(_account)
fields = [FieldNoType(name="name",
value=_account.name)]
result_form.add_item(fields)
result_form.as_xml(command_node)
return [response]
def execute_add_user(self, info_query):
"""Execute command 'add-user'. Create new account"""
self.__logger.debug("Executing 'add-user' command")
return self.execute_multi_step_command(\
info_query,
"add_user",
lambda session_id: (self.sessions[session_id][0] + 1,
self.sessions[session_id][1]))
def prev_add_user(self, info_query):
"""Execute previous step of command 'add-user'."""
self.__logger.debug("Executing 'add-user' command previous step")
return self.execute_multi_step_command(\
info_query,
"add_user",
lambda session_id: (self.sessions[session_id][0] - 1,
self.sessions[session_id][1]))
def cancel_add_user(self, info_query):
"""Cancel current 'add-user' session"""
self.__logger.debug("Cancelling 'add-user' command")
return self.cancel_command(info_query)
def execute_add_user_1(self, info_query, session_context, command_node, lang_class):
self.__logger.debug("Executing command 'add-user' step 1")
self.add_actions(command_node, [ACTION_NEXT])
result_form = Form(xmlnode_or_type="result",
title=lang_class.select_account_type)
field = result_form.add_field(name="account_type",
field_type="list-single",
label="Account type")
for (account_type, type_label) in \
self.component.account_manager.list_account_types(lang_class):
field.add_option(label=type_label,
values=[account_type])
result_form.add_field(name="user_jid",
field_type="text-single",
label=lang_class.field_user_jid)
result_form.as_xml(command_node)
return []
def execute_add_user_2(self, info_query, session_context, command_node, lang_class):
self.__logger.debug("Executing command 'add-user' step 2")
self.add_actions(command_node, [ACTION_PREVIOUS, ACTION_COMPLETE], 1)
user_jid = session_context["user_jid"]
account_type = session_context["account_type"]
account_class = self.account_manager.get_account_class(account_type)
result_form = self.account_manager.generate_registration_form(lang_class,
account_class,
user_jid)
result_form.as_xml(command_node)
return []
def execute_add_user_3(self, info_query, session_context,
command_node, lang_class):
self.__logger.debug("Executing command 'add-user' step 3")
x_node = info_query.xpath_eval("c:command/jxd:x",
{"c": "http://jabber.org/protocol/commands",
"jxd" : "jabber:x:data"})[0]
x_data = Form(x_node)
to_send = self.component.account_manager.create_account_from_type(\
account_name=session_context["name"],
from_jid=JID(session_context["user_jid"]),
account_type=session_context["account_type"],
lang_class=lang_class,
x_data=x_data)
command_node.setProp("status", STATUS_COMPLETED)
return to_send
def execute_delete_user(self, info_query):
return []
def execute_disable_user(self, info_query):
return []
def execute_reenable_user(self, info_query):
return []
def execute_end_user_session(self, info_query):
return []
def execute_get_user_password(self, info_query):
return []
def execute_change_user_password(self, info_query):
return []
def execute_get_user_roster(self, info_query):
return []
def execute_get_user_lastlogin(self, info_query):
return []
def execute_user_stats(self, info_query):
return []
def execute_edit_blacklist(self, info_query):
return []
def execute_add_to_blacklist_in(self, info_query):
return []
def execute_add_to_blacklist_out(self, info_query):
return []
def execute_edit_whitelist(self, info_query):
return []
def execute_add_to_whitelist_in(self, info_query):
return []
def execute_add_to_whitelist_out(self, info_query):
return []
def execute_get_registered_users_num(self, info_query):
return []
def execute_get_disabled_users_num(self, info_query):
return []
def execute_get_online_users_num(self, info_query):
return []
def execute_get_active_users_num(self, info_query):
return []
def execute_get_idle_users_num(self, info_query):
return []
def execute_get_registered_users_list(self, info_query):
return []
def execute_get_disabled_users_list(self, info_query):
return []
def execute_get_online_users(self, info_query):
return []
def execute_get_active_users(self, info_query):
return []
def execute_get_idle_users(self, info_query):
return []
def execute_announce(self, info_query):
return []
def execute_set_motd(self, info_query):
return []
def execute_edit_motd(self, info_query):
return []
def execute_delete_motd(self, info_query):
return []
def execute_set_welcome(self, info_query):
return []
def execute_delete_welcome(self, info_query):
return []
def execute_edit_admin(self, info_query):
return []
def execute_restart(self, info_query):
return []
def execute_shutdown(self, info_query):
return []
class CommandRootDiscoGetInfoHandler(RootDiscoGetInfoHandler):
def handle(self, info_query, lang_class, node, disco_obj, data):

View File

@@ -30,7 +30,6 @@ __revision__ = "$Id: component.py,v 1.3 2005/09/18 20:24:07 dax Exp $"
import sys
import threading
import time
import logging
import signal
import re
@@ -38,35 +37,33 @@ import traceback
from Queue import Queue
from sqlobject.inheritance import InheritableSQLObject
from sqlobject.sqlbuilder import AND
import pyxmpp.error as error
from pyxmpp.jid import JID
from pyxmpp.jabberd.component import Component
from pyxmpp.jabber.disco import DiscoInfo, DiscoItems, DiscoItem, DiscoIdentity
from pyxmpp.message import Message
from pyxmpp.presence import Presence
from pyxmpp.jabber.dataforms import Form, Field, Option
from pyxmpp.jabber.dataforms import Form
import jcl
from jcl.error import FieldError
from jcl.jabber import Handler
from jcl.jabber.disco import AccountDiscoGetInfoHandler, \
AccountTypeDiscoGetInfoHandler, RootDiscoGetItemsHandler, \
AccountTypeDiscoGetItemsHandler
from jcl.jabber.message import PasswordMessageHandler
import jcl.jabber.command as command
from jcl.jabber.command import CommandDiscoGetItemsHandler, \
CommandDiscoGetInfoHandler, CommandManager, JCLCommandManager, \
CommandDiscoGetInfoHandler, JCLCommandManager, \
CommandRootDiscoGetInfoHandler
from jcl.jabber.presence import AccountPresenceAvailableHandler, \
RootPresenceAvailableHandler, AccountPresenceUnavailableHandler, \
RootPresenceUnavailableHandler, AccountPresenceSubscribeHandler, \
RootPresenceSubscribeHandler, AccountPresenceUnsubscribeHandler
from jcl.jabber.register import RootSetRegisterHandler, \
AccountSetRegisterHandler, AccountTypeSetRegisterHandler
import jcl.model as model
from jcl.model import account
from jcl.model.account import Account, LegacyJID
from jcl.model.account import Account
from jcl.lang import Lang
VERSION = "0.1"
@@ -125,6 +122,9 @@ class JCLComponent(Component, object):
AccountDiscoGetInfoHandler(self),
AccountTypeDiscoGetInfoHandler(self)],
[CommandDiscoGetInfoHandler(self)]]
self.set_register_handlers = [[RootSetRegisterHandler(self),
AccountSetRegisterHandler(self),
AccountTypeSetRegisterHandler(self)]]
self.__logger = logging.getLogger("jcl.jabber.JCLComponent")
self.lang = lang
@@ -313,11 +313,11 @@ class JCLComponent(Component, object):
result += handler_result
break
except Exception, e:
type, value, stack = sys.exc_info()
error_type, value, stack = sys.exc_info()
self.__logger.error("Error with handler " + str(handler) +
" with " + str(stanza) + "\n%s\n%s"
% (e, "".join(traceback.format_exception
(type, value, stack, 5))))
(error_type, value, stack, 5))))
result += [Message(from_jid=stanza.get_to(),
to_jid=stanza.get_from(),
stanza_type="error",
@@ -431,7 +431,6 @@ class JCLComponent(Component, object):
lang_class = \
self.lang.get_lang_class_from_node(info_query.get_node())
from_jid = info_query.get_from()
base_from_jid = unicode(from_jid.bare())
remove = info_query.xpath_eval("r:query/r:remove",
{"r" : "jabber:iq:register"})
if remove:
@@ -442,7 +441,6 @@ class JCLComponent(Component, object):
x_node = info_query.xpath_eval("jir:query/jxd:x",
{"jir" : "jabber:iq:register",
"jxd" : "jabber:x:data"})[0]
x_data = Form(x_node)
if not "name" in x_data or x_data["name"].value == "":
iq_error = info_query.make_error_response("not-acceptable")
@@ -453,36 +451,19 @@ class JCLComponent(Component, object):
text.setNs(text.newNs(error.STANZA_ERROR_NS, None))
self.stream.send(iq_error)
return
account_name = x_data["name"].value
return self.apply_behavior(\
return self.apply_registered_behavior(\
self.set_register_handlers,
info_query,
lambda name, from_jid, account_type, lang_class: \
self.account_manager.account_set_register(name,
from_jid,
lang_class,
x_data,
info_query),
lambda name, from_jid, account_type, lang_class: \
self.account_manager.account_type_set_register(account_name,
from_jid,
account_type,
lang_class,
x_data,
info_query),
lambda name, from_jid, account_type, lang_class: \
self.account_manager.root_set_register(account_name,
from_jid,
lang_class,
x_data,
info_query),
send_result=True)
apply_handle_func=lambda handle_func, stanza, lang_class, data, result: \
handle_func(stanza, lang_class, data, x_data))
def handle_presence_available(self, stanza):
"""Handle presence availability
if presence sent to the component ('if not name'), presence is sent to
all accounts for current user. Otherwise, send presence from current account.
"""
result = self.apply_registered_behavior(self.presence_available_handlers, stanza)
result = self.apply_registered_behavior(self.presence_available_handlers,
stanza)
return result
@@ -539,16 +520,24 @@ class JCLComponent(Component, object):
Handle command IQ
"""
self.__logger.debug("COMMAND")
_command = info_query.xpath_eval("c:command",
{"c": command.COMMAND_NS})[0]
command_node = _command.prop("node")
action = _command.prop("action")
if action is None:
action = "execute"
result = command.command_manager.apply_command_action(info_query,
command_node,
action)
self.send_stanzas(result)
try:
result = command.command_manager.apply_command_action(info_query,
command_node,
action)
self.send_stanzas(result)
except:
type, value, stack = sys.exc_info()
self.__logger.error("Error in command " + str(command_node) +
" with " + str(info_query) + "\n%s"
% ("".join(traceback.format_exception
(type, value, stack, 5))))
return 1
###########################################################################
@@ -579,6 +568,7 @@ class AccountManager(object):
"""AccountManager constructor"""
self.__logger = logging.getLogger("jcl.jabber.JCLComponent")
self.regexp_type = re.compile("(.*)Account$")
self._account_classes = None
self.account_classes = (Account,)
self.component = component
self.account_types = []
@@ -613,17 +603,14 @@ class AccountManager(object):
"""Handle get_register on an account.
Return a preinitialized form"""
info_query = info_query.make_result_response()
account_class = self.get_account_class(account_type + "Account")
account_class = self.get_account_class(account_type)
model.db_connect()
accounts = account_class.select(\
AND(account_class.q.name == name,
account_class.q.user_jid == unicode(from_jid.bare())))
if accounts is not None:
_account = account.get_account(from_jid.bare(), name, account_class)
if _account is not None:
query = info_query.new_query("jabber:iq:register")
_account = accounts[0]
model.db_disconnect()
self.get_reg_form_init(lang_class,
_account).as_xml(query)
self.generate_registration_form_init(lang_class,
_account).as_xml(query)
else:
model.db_disconnect()
return [info_query]
@@ -634,16 +621,16 @@ class AccountManager(object):
query = info_query.new_query("jabber:iq:register")
from_jid = info_query.get_from()
bare_from_jid = unicode(from_jid.bare())
self.get_reg_form(lang_class,
account_class,
bare_from_jid).as_xml(query)
self.generate_registration_form(lang_class,
account_class,
bare_from_jid).as_xml(query)
return [info_query]
def account_type_get_register(self, info_query, account_type, lang_class):
"""Handle get_register on an account_type node"""
return self._account_type_get_register(\
info_query,
self.get_account_class(account_type + "Account"),
self.get_account_class(account_type),
lang_class)
def root_get_register(self, info_query, lang_class):
@@ -659,7 +646,7 @@ class AccountManager(object):
those accounts from the DataBase"""
model.db_connect()
result = []
for _account in Account.select(Account.q.user_jid == unicode(user_jid)):
for _account in account.get_accounts(user_jid):
self.__logger.debug("Deleting " + _account.name
+ " for " + unicode(user_jid))
# get_jid
@@ -679,132 +666,123 @@ class AccountManager(object):
model.db_disconnect()
return result
def _populate_account(self, _account, lang_class, x_data,
info_query, new_account, first_account):
def populate_account(self, _account, lang_class, x_data,
new_account, first_account, from_jid=None):
"""Populate given account"""
if from_jid is None:
from_jid = _account.user_jid
field = None
result = []
from_jid = info_query.get_from()
bare_from_jid = unicode(from_jid.bare())
model.db_connect()
try:
for (field, field_type, field_options, field_post_func,
field_default_func) in _account.get_register_fields():
if field is not None:
if field in x_data:
value = x_data[field].value
else:
value = None
setattr(_account, field,
field_post_func(value, field_default_func,
bare_from_jid))
except FieldError, exception:
_account.destroySelf()
type, value, stack = sys.exc_info()
self.__logger.error("Error while populating account: %s\n%s" %
(exception, "".join(traceback.format_exception
(type, value, stack, 5))))
iq_error = info_query.make_error_response("not-acceptable")
text = iq_error.get_error().xmlnode.newTextChild(\
None,
"text",
lang_class.mandatory_field % (field))
text.setNs(text.newNs(error.STANZA_ERROR_NS, None))
result.append(iq_error)
model.db_disconnect()
return result
result.append(info_query.make_result_response())
for (field, field_type, field_options, field_post_func,
field_default_func) in _account.get_register_fields():
if field is not None:
if field in x_data:
value = x_data[field].value
else:
value = None
setattr(_account, field,
field_post_func(value, field_default_func,
from_jid.bare()))
# TODO : _account.user_jid or from_jid
if first_account:
# component subscribe user presence when registering the first
# account
result.append(Presence(from_jid=self.component.jid,
to_jid=_account.user_jid,
to_jid=from_jid,
stanza_type="subscribe"))
if new_account:
# subscribe to user presence if this is a new account
result.append(Message(\
from_jid=self.component.jid,
to_jid=_account.user_jid,
to_jid=from_jid,
subject=_account.get_new_message_subject(lang_class),
body=_account.get_new_message_body(lang_class)))
result.append(Presence(from_jid=_account.jid,
to_jid=_account.user_jid,
to_jid=from_jid,
stanza_type="subscribe"))
else:
result.append(Message(\
from_jid=self.component.jid,
to_jid=_account.user_jid,
to_jid=from_jid,
subject=_account.get_update_message_subject(lang_class),
body=_account.get_update_message_body(lang_class)))
model.db_disconnect()
return result
def account_set_register(self, name, from_jid, lang_class,
x_data, info_query):
def update_account(self,
account_name,
from_jid,
lang_class,
x_data):
"""Update account"""
self.__logger.debug("Updating account " + name)
self.__logger.debug("Updating account " + account_name)
bare_from_jid = from_jid.bare()
model.db_connect()
accounts = Account.select(\
AND(Account.q.name == name,
Account.q.user_jid == unicode(bare_from_jid)))
accounts_count = accounts.count()
_account = list(accounts)[0]
model.db_disconnect()
if accounts_count > 1:
# Just print a warning, only the first account will be use
self.__logger.error("There might not exist 2 accounts for " +
bare_from_jid + " and named " + name)
if accounts_count >= 1:
return self._populate_account(_account, lang_class,
x_data, info_query, False, False)
_account = account.get_account(bare_from_jid,
account_name)
if _account is not None:
return self.populate_account(_account, lang_class,
x_data,
new_account=False,
first_account=False,
from_jid=from_jid)
else:
self.__logger.error("Account " + name +
self.__logger.error("Account " + account_name +
" was not found, cannot update it")
return []
def _account_type_set_register(self, name,
def create_account(self,
account_name,
from_jid,
account_class,
lang_class,
x_data):
"""Create new account from account_class"""
bare_from_jid = from_jid.bare()
first_account = (account.get_accounts_count(bare_from_jid) == 0)
model.db_connect()
_account = account_class(user_jid=unicode(bare_from_jid),
name=account_name,
jid=self.get_account_jid(account_name))
model.db_disconnect()
try:
return self.populate_account(_account, lang_class, x_data,
new_account=True,
first_account=first_account,
from_jid=from_jid)
except FieldError, field_error:
model.db_connect()
_account.destroySelf()
model.db_disconnect()
raise field_error
def create_account_from_type(self,
account_name,
from_jid,
account_type,
lang_class,
x_data):
"""Create new account from its type name"""
account_class = self.get_account_class(account_type)
return self.create_account(account_name,
from_jid,
account_class,
lang_class,
x_data,
info_query):
"""Create new account from account_class"""
model.db_connect()
bare_from_jid = from_jid.bare()
_account = account_class(user_jid=unicode(bare_from_jid),
name=name,
jid=self.get_account_jid(name))
all_accounts = Account.select(\
Account.q.user_jid == unicode(bare_from_jid))
first_account = (all_accounts.count() > 0)
model.db_disconnect()
return self._populate_account(_account, lang_class, x_data,
info_query, True, first_account)
x_data)
def account_type_set_register(self, name,
from_jid,
account_type,
lang_class,
x_data,
info_query):
"""Create new typed account"""
return self._account_type_set_register(\
name, from_jid,
self.get_account_class(account_type + "Account"), lang_class,
x_data, info_query)
def root_set_register(self, name, from_jid, lang_class,
x_data, info_query):
def create_default_account(self,
account_name,
bare_from_jid,
lang_class,
x_data):
"""Create new account when managing only one account type"""
if not self.has_multiple_account_type:
return self._account_type_set_register(name, from_jid,
self.account_classes[0],
lang_class, x_data,
info_query)
return self.create_account(account_name, bare_from_jid,
self.account_classes[0],
lang_class, x_data)
else:
return []
###### presence generic handlers ######
def send_presence_all(self, presence):
"""Send presence to all account. Optimized to use only one sql
@@ -844,12 +822,10 @@ class AccountManager(object):
else:
resource = ""
model.db_connect()
accounts = account_class.select(account_class.q.user_jid == \
unicode(bare_from_jid))
if accounts.count() == 0:
return
for _account in accounts:
yield (_account, resource, account_type)
accounts = account.get_accounts(bare_from_jid, account_class)
if accounts:
for _account in accounts:
yield (_account, resource, account_type)
model.db_disconnect()
def list_account_types(self, lang_class):
@@ -862,9 +838,15 @@ class AccountManager(object):
type_label = account_type
yield (account_type, type_label)
def get_account_class(self, account_class_name):
def get_account_class(self, account_type=None,
account_class_name=None):
"""Return account class definition from declared classes in
account_classes from its class name"""
if account_type is not None:
account_class_name = account_type + "Account"
elif account_class_name is None:
self.__logger.error("account_type and account_class_name are None")
return None
self.__logger.debug("Looking for " + account_class_name)
for _account_class in self.account_classes:
if _account_class.__name__.lower() == account_class_name.lower():
@@ -873,7 +855,7 @@ class AccountManager(object):
self.__logger.debug(account_class_name + " not found")
return None
def get_reg_form(self, lang_class, _account_class, bare_from_jid):
def generate_registration_form(self, lang_class, _account_class, bare_from_jid):
"""
Return register form based on language and account class
"""
@@ -885,7 +867,6 @@ class AccountManager(object):
name="name",
required=True)
model.db_connect()
for (field_name,
field_type,
field_options,
@@ -895,6 +876,7 @@ class AccountManager(object):
if field_name is None:
# TODO : Add page when empty tuple given
pass
else:
lang_label_attr = "field_" + field_name
if hasattr(lang_class, lang_label_attr):
@@ -920,15 +902,14 @@ class AccountManager(object):
except:
self.__logger.debug("Setting field " + field_name + " required")
field.required = True
model.db_disconnect()
return reg_form
def get_reg_form_init(self, lang_class, _account):
def generate_registration_form_init(self, lang_class, _account):
"""
Return register form for an existing account (update)
"""
reg_form = self.get_reg_form(lang_class, _account.__class__,
_account.user_jid)
reg_form = self.generate_registration_form(lang_class, _account.__class__,
_account.user_jid)
reg_form["name"].value = _account.name
reg_form["name"].type = "hidden"
for field in reg_form.fields:

View File

@@ -133,7 +133,7 @@ class AccountTypeDiscoGetItemsHandler(DiscoHandler):
account_type = data
from_jid = stanza.get_from()
self.__logger.debug("Listing account for " + account_type)
account_class = self.component.account_manager.get_account_class(account_type + "Account")
account_class = self.component.account_manager.get_account_class(account_type)
if account_class is not None:
disco_items = DiscoItems()
for (_account, resource, account_type) in \
@@ -146,6 +146,6 @@ class AccountTypeDiscoGetItemsHandler(DiscoHandler):
_account.long_name)
return [disco_items]
else:
self.__logger.error("Error: " + account_class.__name__
self.__logger.error("Error: " + str(account_class)
+ " class not in account_classes")
return []

View File

@@ -22,9 +22,6 @@
import re
from sqlobject.sqlbuilder import AND
from pyxmpp.message import Message
from jcl.jabber import Handler
import jcl.model.account as account
from jcl.model.account import Account
@@ -42,8 +39,8 @@ class PasswordMessageHandler(Handler):
Return the uniq account associated with a name and user JID.
DB connection might already be opened.
"""
_account = account.get_account(stanza.get_to().node,
stanza.get_from().bare())
_account = account.get_account(stanza.get_from().bare(),
stanza.get_to().node)
if hasattr(_account, 'password') \
and hasattr(_account, 'waiting_password_reply') \
and (getattr(_account, 'waiting_password_reply') == True) \

View File

@@ -21,13 +21,30 @@
##
import unittest
import sys
import os
from pyxmpp.jabber.dataforms import Form, Field
from pyxmpp.presence import Presence
from pyxmpp.jabber.dataforms import Form
from pyxmpp.iq import Iq
from pyxmpp.message import Message
from sqlobject.dbconnection import TheURIOpener
from jcl.lang import Lang
from jcl.jabber.component import JCLComponent
import jcl.jabber.command as command
from jcl.jabber.command import FieldNoType
from jcl.jabber.command import FieldNoType, JCLCommandManager
import jcl.model as model
import jcl.model.account as account
from jcl.model.account import Account
from jcl.model.tests.account import ExampleAccount, Example2Account
if sys.platform == "win32":
DB_PATH = "/c|/temp/jcl_test.db"
else:
DB_PATH = "/tmp/jcl_test.db"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
class FieldNoType_TestCase(unittest.TestCase):
def test_complete_xml_element(self):
@@ -39,21 +56,521 @@ class FieldNoType_TestCase(unittest.TestCase):
self.assertFalse(fake_iq.xmlnode.hasProp("type"))
class CommandManager_TestCase(unittest.TestCase):
def test_get_long_command_desc(self):
command_desc = command.command_manager.get_command_desc("http://jabber.org/protocol/admin#test-command",
Lang.en)
self.assertEquals(command_desc, "test-command")
def test_get_short_command_name_form_long_name(self):
command_name = command.command_manager.get_short_command_name("http://jabber.org/protocol/admin#test-command")
self.assertEquals(command_name, "test_command")
def test_get_command_desc(self):
command_desc = command.command_manager.get_command_desc("test-command",
Lang.en)
self.assertEquals(command_desc, "test-command")
def test_get_short_command_name(self):
command_name = command.command_manager.get_short_command_name("test-command")
self.assertEquals(command_name, "test_command")
class JCLCommandManager_TestCase(unittest.TestCase):
def setUp(self):
self.comp = JCLComponent("jcl.test.com",
"password",
"localhost",
"5347")
model.db_connection_str = 'sqlite://' + DB_URL
model.db_connect()
Account.createTable(ifNotExists=True)
ExampleAccount.createTable(ifNotExists=True)
Example2Account.createTable(ifNotExists = True)
model.db_disconnect()
self.command_manager = JCLCommandManager(self.comp,
self.comp.account_manager)
def tearDown(self):
model.db_connect()
Example2Account.dropTable(ifExists=True)
ExampleAccount.dropTable(ifExists=True)
Account.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
def test_execute_add_user(self):
self.comp.account_manager.account_classes = (ExampleAccount,
Example2Account)
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#add-user",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertNotEquals(xml_command.prop("sessionid"), None)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 1)
self.assertEquals(actions[0].prop("execute"), "next")
self.assertEquals(actions[0].children.name, "next")
options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(options), 2)
self.assertEquals(options[0].prop("label"), "Example")
self.assertEquals(options[0].children.name, "value")
self.assertEquals(options[0].children.content, "Example")
self.assertEquals(options[1].prop("label"), "Example2")
self.assertEquals(options[1].children.name, "value")
self.assertEquals(options[1].children.content, "Example2")
user_jid_field = result[0].xpath_eval("c:command/data:x/data:field[2]",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertNotEquals(user_jid_field, None)
self.assertEquals(len(user_jid_field), 1)
self.assertEquals(user_jid_field[0].prop("var"), "user_jid")
self.assertEquals(user_jid_field[0].prop("type"), "text-single")
self.assertEquals(user_jid_field[0].prop("label"), Lang.en.field_user_jid)
# Second step
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
session_id = xml_command.prop("sessionid")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "next")
submit_form = Form(xmlnode_or_type="submit")
submit_form.add_field(field_type="list-single",
name="account_type",
value="Example")
submit_form.add_field(field_type="text-single",
name="user_jid",
value="user2@test.com")
submit_form.as_xml(command_node)
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#add-user",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertEquals(xml_command.prop("sessionid"), session_id)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 1)
self.assertEquals(actions[0].prop("execute"), "complete")
self.assertEquals(actions[0].children.name, "prev")
self.assertEquals(actions[0].children.next.name, "complete")
fields = result[0].xpath_eval("c:command/data:x/data:field",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(fields), 6)
context_session = self.command_manager.sessions[session_id][1]
self.assertEquals(context_session["account_type"], "Example")
self.assertEquals(context_session["user_jid"], "user2@test.com")
# Third step
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
command_node.setProp("action", "finish")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "complete")
submit_form = Form(xmlnode_or_type="submit")
submit_form.add_field(field_type="text-single",
name="name",
value="account1")
submit_form.add_field(field_type="text-single",
name="login",
value="login1")
submit_form.add_field(field_type="text-private",
name="password",
value="pass1")
submit_form.add_field(field_type="boolean",
name="store_password",
value="1")
submit_form.add_field(field_type="list-single",
name="test_enum",
value="choice2")
submit_form.add_field(field_type="text-single",
name="test_int",
value="42")
submit_form.as_xml(command_node)
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#add-user",
"execute")
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "completed")
self.assertEquals(xml_command.prop("sessionid"), session_id)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 0)
self.assertEquals(context_session["name"], "account1")
self.assertEquals(context_session["login"], "login1")
self.assertEquals(context_session["password"], "pass1")
self.assertEquals(context_session["store_password"], "1")
self.assertEquals(context_session["test_enum"], "choice2")
self.assertEquals(context_session["test_int"], "42")
model.db_connect()
_account = account.get_account("user2@test.com",
"account1")
self.assertNotEquals(_account, None)
self.assertEquals(_account.user_jid, "user2@test.com")
self.assertEquals(_account.name, "account1")
self.assertEquals(_account.jid, "account1@jcl.test.com")
model.db_disconnect()
stanza_sent = result
self.assertEquals(len(stanza_sent), 4)
iq_result = stanza_sent[0]
self.assertTrue(isinstance(iq_result, Iq))
self.assertEquals(iq_result.get_node().prop("type"), "result")
self.assertEquals(iq_result.get_from(), "jcl.test.com")
self.assertEquals(iq_result.get_to(), "user1@test.com")
presence_component = stanza_sent[1]
self.assertTrue(isinstance(presence_component, Presence))
self.assertEquals(presence_component.get_from(), "jcl.test.com")
self.assertEquals(presence_component.get_to(), "user2@test.com")
self.assertEquals(presence_component.get_node().prop("type"),
"subscribe")
message = stanza_sent[2]
self.assertTrue(isinstance(message, Message))
self.assertEquals(message.get_from(), "jcl.test.com")
self.assertEquals(message.get_to(), "user2@test.com")
self.assertEquals(message.get_subject(),
_account.get_new_message_subject(Lang.en))
self.assertEquals(message.get_body(),
_account.get_new_message_body(Lang.en))
presence_account = stanza_sent[3]
self.assertTrue(isinstance(presence_account, Presence))
self.assertEquals(presence_account.get_from(), "account1@jcl.test.com")
self.assertEquals(presence_account.get_to(), "user2@test.com")
self.assertEquals(presence_account.get_node().prop("type"),
"subscribe")
# def test_execute_delete_user(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_disable_user(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_reenable_user(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_end_user_session(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_user_password(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_change_user_password(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_user_roster(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_user_last_login(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_user_stats(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_edit_blacklist(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_add_to_blacklist_in(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_add_to_blacklist_out(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_edit_whitelist(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_add_to_whitelist_in(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_add_to_whitelist_out(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_registered_users_num(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_disabled_users_num(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_online_users_num(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_active_users_num(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_idle_users_num(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_registered_users_list(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_disabled_users_list(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_online_users(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_active_users(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_get_idle_users(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_announce(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_set_motd(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_edit_motd(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_delete_motd(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_set_welcome(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_delete_welcome(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_edit_admin(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_restart(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_shutdown(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(CommandManager_TestCase, 'test'))
suite.addTest(unittest.makeSuite(FieldNoType_TestCase, 'test'))
suite.addTest(unittest.makeSuite(JCLCommandManager_TestCase, 'test'))
return suite
if __name__ == '__main__':

File diff suppressed because it is too large Load Diff

View File

@@ -104,6 +104,8 @@ class Lang:
field_dnd_action_0 = field_action_0
field_offline_action_0 = field_action_0
field_user_jid = u"User JID"
error_subject = u"Error"
error_body = u"An error has occured:\n\t%s"
@@ -111,6 +113,7 @@ class Lang:
get_gateway_prompt = u"Email address"
command_list = u"List accounts"
select_account_type = u"Select account type"
class fr:
component_name = u"composant générique Jabber Component Library"
@@ -147,6 +150,8 @@ class Lang:
field_dnd_action_0 = field_action_0
field_offline_action_0 = field_action_0
field_user_jid = u"JID utilisateur"
error_subject = u"Erreur"
error_body = u"Une erreur est survenue :\n\t%s"
@@ -154,6 +159,7 @@ class Lang:
get_gateway_prompt = u"Adresse email"
command_list = u"Liste les comptes"
select_account_type = u"Selectionner le type de comptes"
class nl:
# TODO: when finish, delete this line and uncomment in tests/lang.py the makeSuite(Language_nl_TestCase, 'test') line

View File

@@ -27,6 +27,7 @@ def db_disconnect():
"""
Delete connection associated to the current thread.
"""
pass
#if jcl.model.db_connected:
#del jcl.model.hub.threadConnection
#jcl.model.db_connected = False

View File

@@ -27,7 +27,7 @@
__revision__ = "$Id: account.py,v 1.3 2005/09/18 20:24:07 dax Exp $"
from sqlobject.inheritance import InheritableSQLObject
from sqlobject.col import StringCol, EnumCol, IntCol, BoolCol, ForeignKey
from sqlobject.col import StringCol, IntCol, BoolCol, ForeignKey
from sqlobject.joins import MultipleJoin
from sqlobject.sqlbuilder import AND
@@ -50,35 +50,13 @@ def int_post_func(field_value, default_func, bare_from_jid):
return int(default_func(bare_from_jid))
return int(field_value)
def mandatory_field(field_value):
def mandatory_field(field_name, field_value):
"""Used as default function for field that must be specified
and cannot have an empty value"""
if field_value is None or str(field_value) == "":
raise FieldError # TODO : add translated message
raise FieldError(field_name, "Field required") # TODO : add translated message
return field_value
def get_account(name, bare_user_jid):
result = None
model.db_connect()
accounts = Account.select(\
AND(Account.q.name == name,
Account.q.user_jid == unicode(bare_user_jid)))
if accounts.count() > 0:
result = accounts[0]
model.db_disconnect()
return result
def get_accounts(bare_user_jid):
model.db_connect()
accounts = Account.select(\
Account.q.user_jid == unicode(bare_user_jid))
if accounts.count() == 0:
model.db_disconnect()
return
for _account in accounts:
yield _account
model.db_disconnect()
class Account(InheritableSQLObject):
"""Base Account class"""
_cacheValue = False
@@ -162,6 +140,51 @@ class Account(InheritableSQLObject):
"""Return localized message body for existing account"""
return lang_class.update_account_message_body
def get_account(bare_user_jid, name, account_class=Account):
result = None
model.db_connect()
accounts = account_class.select(\
AND(account_class.q.name == name,
account_class.q.user_jid == unicode(bare_user_jid)))
if accounts.count() > 0:
result = accounts[0]
model.db_disconnect()
return result
def get_accounts(bare_user_jid, account_class=Account):
model.db_connect()
accounts = account_class.select(\
account_class.q.user_jid == unicode(bare_user_jid))
if accounts.count() == 0:
model.db_disconnect()
return
for _account in accounts:
yield _account
model.db_disconnect()
def get_all_accounts(account_class=Account):
model.db_connect()
accounts = account_class.select()
if accounts.count() == 0:
model.db_disconnect()
return
for _account in accounts:
yield _account
model.db_disconnect()
def get_accounts_count(bare_user_jid, account_class=Account):
model.db_connect()
accounts_count = account_class.select(\
account_class.q.user_jid == unicode(bare_user_jid)).count()
model.db_disconnect()
return accounts_count
def get_all_accounts_count(account_class=Account):
model.db_connect()
accounts_count = account_class.select().count()
model.db_disconnect()
return accounts_count
class PresenceAccount(Account):
DO_NOTHING = 0
DO_SOMETHING = 1

View File

@@ -58,7 +58,7 @@ class ExampleAccount(Account):
return Account.get_register_fields(real_class) + \
[("login", "text-single", None,
lambda field_value, default_func, bare_from_jid: \
account.mandatory_field(field_value),
account.mandatory_field("login", field_value),
lambda bare_from_jid: ""),
("password", "text-private", None,
lambda field_value, default_func, bare_from_jid: \
@@ -152,15 +152,15 @@ class AccountModule_TestCase(unittest.TestCase):
def test_mandatory_field_empty(self):
self.assertRaises(FieldError,
account.mandatory_field,
"")
"test", "")
def test_mandatory_field_none(self):
self.assertRaises(FieldError,
account.mandatory_field,
None)
"test", None)
def test_mandatory_field_empty(self):
self.assertEquals(account.mandatory_field("value"),
self.assertEquals(account.mandatory_field("test", "value"),
"value")
class InheritableAccount_TestCase(unittest.TestCase):

View File

@@ -108,6 +108,8 @@ class Language_TestCase(unittest.TestCase):
self.assertNotEquals(self.lang_class.field_dnd_action_0, None)
self.assertNotEquals(self.lang_class.field_offline_action_0, None)
self.assertNotEquals(self.lang_class.field_user_jid, None)
self.assertNotEquals(self.lang_class.error_subject, None)
self.assertNotEquals(self.lang_class.error_body % (""), None)
@@ -115,6 +117,7 @@ class Language_TestCase(unittest.TestCase):
self.assertNotEquals(self.lang_class.get_gateway_prompt, None)
self.assertNotEquals(self.lang_class.command_list, None)
self.assertNotEquals(self.lang_class.select_account_type, None)
class Language_fr_TestCase(Language_TestCase):
def setUp(self):