delete-user ad-hoc command implementation

darcs-hash:20070725164723-86b55-8b1b3dd6981e23df4653cf5b290ba1ce1aba68e3.gz
This commit is contained in:
David Rousselie
2007-07-25 18:47:23 +02:00
parent 8599ac5bf9
commit 71ec18ff69
8 changed files with 563 additions and 98 deletions

View File

@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
##
## command.py
## Login : David Rousselie <dax@happycoders.org>
@@ -40,7 +41,7 @@ ACTION_CANCEL = "cancel"
STATUS_COMPLETED = "completed"
STATUS_EXECUTING = "executing"
STATUS_CANCELLED = "cancel"
STATUS_CANCELED = "canceled"
class FieldNoType(Field):
def complete_xml_element(self, xmlnode, doc):
@@ -101,25 +102,57 @@ class CommandManager(object):
def apply_command_action(self, info_query, command_name, action):
"""Apply action on command"""
short_command_name = self.get_short_command_name(command_name)
action_command_method = action + "_" + short_command_name
action_command_method = "apply_" + action + "_command"
if hasattr(self, action_command_method):
return getattr(self, action_command_method)(info_query)
return getattr(self, action_command_method)(info_query,
short_command_name)
else:
return [info_query.make_error_response("feature-not-implemented")]
return [info_query.make_error_response(\
"feature-not-implemented")]
def apply_execute_command(self, info_query, short_command_name):
return self.execute_multi_step_command(\
info_query,
short_command_name,
lambda session_id: (self.sessions[session_id][0] + 1,
self.sessions[session_id][1]))
def apply_prev_command(self, info_query, short_command_name):
return self.execute_multi_step_command(\
info_query,
short_command_name,
lambda session_id: (self.sessions[session_id][0] - 1,
self.sessions[session_id][1]))
def apply_cancel_command(self, info_query, short_command_name):
xml_command = info_query.xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
if xml_command.hasProp("sessionid"):
session_id = xml_command.prop("sessionid")
response = info_query.make_result_response()
command_node = response.set_new_content(COMMAND_NS, "command")
command_node.setProp("node", xml_command.prop("node"))
command_node.setProp("status", STATUS_CANCELED)
command_node.setProp("sessionid",
session_id)
del self.sessions[session_id]
return [response]
def generate_session_id(self, node):
return self.get_short_command_name(node) + ":" + \
datetime.datetime.now().isoformat()
def _create_response(self, info_query, completed=True):
xml_command = info_query.xpath_eval("c:command",
xml_command = info_query.xpath_eval(\
"c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
node = xml_command.prop("node")
if xml_command.hasProp("sessionid"):
session_id = xml_command.prop("sessionid")
else:
session_id = self.generate_session_id(node)
self.__logger.debug("Creating command execution with session ID " + str(session_id))
self.__logger.debug("Creating command execution with session ID "
+ str(session_id))
response = info_query.make_result_response()
command_node = response.set_new_content(COMMAND_NS, "command")
command_node.setProp("node", node)
@@ -138,23 +171,29 @@ class CommandManager(object):
"data": "jabber:x:data"})
for field in fields:
field_name = field.prop("var")
value = info_query.xpath_eval(\
values = info_query.xpath_eval(\
"c:command/data:x/data:field[@var='"
+ field_name + "']/data:value",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
if len(value) > 0:
if len(values) > 0:
if len(values) > 1:
values = map(lambda value: value.content, values)
else:
values = values[0].content
self.__logger.debug("Adding to session '" + session_id
+ "': " + field_name + "="
+ value[0].content)
self.sessions[session_id][1][field_name] = value[0].content
+ str(values))
self.sessions[session_id][1][field_name] = values
def execute_multi_step_command(self, info_query, short_node, update_step_func):
def execute_multi_step_command(self, info_query, short_node,
update_step_func):
(response,
command_node,
session_id) = self._create_response(info_query,
False)
lang_class = self.component.lang.get_lang_class_from_node(info_query.get_node())
lang_class = self.component.lang.get_lang_class_from_node(\
info_query.get_node())
if not self.sessions.has_key(session_id):
self.sessions[session_id] = (1, {})
else:
@@ -162,10 +201,15 @@ class CommandManager(object):
step = self.sessions[session_id][0]
step_method = "execute_" + short_node + "_" + str(step)
self.parse_form(info_query, session_id)
return [response] + getattr(self, step_method)(info_query,
if hasattr(self, step_method):
return [response] + getattr(self, step_method)(\
info_query,
self.sessions[session_id][1],
command_node,
lang_class)
else:
return [info_query.make_error_response(\
"feature-not-implemented")]
def add_actions(self, command_node, actions, default_action_idx=0):
actions_node = command_node.newTextChild(None, "actions", None)
@@ -174,19 +218,6 @@ class CommandManager(object):
actions_node.newTextChild(None, action, None)
return actions_node
def cancel_command(self, info_query):
xml_command = info_query.xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
if xml_command.hasProp("sessionid"):
session_id = xml_command.prop("sessionid")
response = info_query.make_result_response()
command_node = response.set_new_content(COMMAND_NS, "command")
command_node.setProp("node", xml_command.prop("node"))
command_node.setProp("status", STATUS_CANCELLED)
command_node.setProp("sessionid",
session_id)
del self.sessions[session_id]
command_manager = CommandManager()
class JCLCommandManager(CommandManager):
@@ -233,11 +264,43 @@ class JCLCommandManager(CommandManager):
"http://jabber.org/protocol/admin#restart",
"http://jabber.org/protocol/admin#shutdown"])
def execute_list(self, info_query):
## Reusable steps
def add_form_select_user_jids(self, command_node, lang_class):
result_form = Form(xmlnode_or_type="result",
title="TODO")
result_form.add_field(name="user_jids",
field_type="jid-multi",
label=lang_class.field_user_jid)
result_form.as_xml(command_node)
return []
def add_form_select_accounts(self, session_context,
command_node, lang_class):
"""
Add a form to select account for user JIDs contained in
session_context[\"user_jids\"]
"""
result_form = Form(xmlnode_or_type="result",
title="TODO")
field = result_form.add_field(name="account_names",
field_type="list-multi",
label="Account")
for (account_type, type_label) in \
self.account_manager.list_account_types(lang_class):
account_class = self.account_manager.get_account_class(\
account_type=account_type)
for user_jid in session_context["user_jids"]:
for _account in account.get_accounts(user_jid, account_class):
field.add_option(label=_account.name + " (" + account_type
+ ") (" + user_jid + ")",
values=[_account.name + "/" + user_jid])
result_form.as_xml(command_node)
return []
def execute_list_1(self, info_query, session_context,
command_node, lang_class):
"""Execute command 'list'. List accounts"""
self.__logger.debug("Executing 'list' command")
(response, command_node, session_id) = self._create_response(info_query,
"list")
result_form = Form(xmlnode_or_type="result",
title="Registered account") # TODO : add to Lang
result_form.reported_fields.append(FieldNoType(name="name",
@@ -248,32 +311,11 @@ class JCLCommandManager(CommandManager):
value=_account.name)]
result_form.add_item(fields)
result_form.as_xml(command_node)
return [response]
command_node.setProp("status", STATUS_COMPLETED)
return []
def execute_add_user(self, info_query):
"""Execute command 'add-user'. Create new account"""
self.__logger.debug("Executing 'add-user' command")
return self.execute_multi_step_command(\
info_query,
"add_user",
lambda session_id: (self.sessions[session_id][0] + 1,
self.sessions[session_id][1]))
def prev_add_user(self, info_query):
"""Execute previous step of command 'add-user'."""
self.__logger.debug("Executing 'add-user' command previous step")
return self.execute_multi_step_command(\
info_query,
"add_user",
lambda session_id: (self.sessions[session_id][0] - 1,
self.sessions[session_id][1]))
def cancel_add_user(self, info_query):
"""Cancel current 'add-user' session"""
self.__logger.debug("Cancelling 'add-user' command")
return self.cancel_command(info_query)
def execute_add_user_1(self, info_query, session_context, command_node, lang_class):
def execute_add_user_1(self, info_query, session_context,
command_node, lang_class):
self.__logger.debug("Executing command 'add-user' step 1")
self.add_actions(command_node, [ACTION_NEXT])
result_form = Form(xmlnode_or_type="result",
@@ -286,7 +328,7 @@ class JCLCommandManager(CommandManager):
field.add_option(label=type_label,
values=[account_type])
result_form.add_field(name="user_jid",
field_type="text-single",
field_type="jid-single",
label=lang_class.field_user_jid)
result_form.as_xml(command_node)
return []
@@ -319,8 +361,28 @@ class JCLCommandManager(CommandManager):
command_node.setProp("status", STATUS_COMPLETED)
return to_send
def execute_delete_user(self, info_query):
return []
def execute_delete_user_1(self, info_query, session_context,
command_node, lang_class):
self.__logger.debug("Executing command 'delete-user' step 1")
self.add_actions(command_node, [ACTION_NEXT])
return self.add_form_select_user_jids(command_node, lang_class)
def execute_delete_user_2(self, info_query, session_context,
command_node, lang_class):
self.__logger.debug("Executing command 'delete-user' step 2")
self.add_actions(command_node, [ACTION_PREVIOUS, ACTION_COMPLETE], 1)
return self.add_form_select_accounts(session_context, command_node, lang_class)
def execute_delete_user_3(self, info_query, session_context,
command_node, lang_class):
self.__logger.debug("Executing command 'delete-user' step 3")
result = []
for account_name in session_context["account_names"]:
name, user_jid = account_name.split("/", 1)[:2]
result += self.account_manager.remove_account_from_name(user_jid,
name)
command_node.setProp("status", STATUS_COMPLETED)
return result
def execute_disable_user(self, info_query):
return []

View File

@@ -37,8 +37,6 @@ import traceback
from Queue import Queue
from sqlobject.sqlbuilder import AND
import pyxmpp.error as error
from pyxmpp.jid import JID
from pyxmpp.jabberd.component import Component
@@ -538,6 +536,10 @@ class JCLComponent(Component, object):
" with " + str(info_query) + "\n%s"
% ("".join(traceback.format_exception
(type, value, stack, 5))))
print("Error in command " + str(command_node) +
" with " + str(info_query) + "\n%s"
% ("".join(traceback.format_exception
(type, value, stack, 5))))
return 1
###########################################################################
@@ -666,6 +668,27 @@ class AccountManager(object):
model.db_disconnect()
return result
def remove_account_from_name(self, user_jid, name):
_account = account.get_account(user_jid, name)
if _account is not None:
return self.remove_account(_account, user_jid)
else:
return []
def remove_account(self, _account, user_jid):
self.__logger.debug("Deleting account: " + str(_account))
result = []
model.db_connect()
result.append(Presence(from_jid=_account.jid,
to_jid=user_jid,
stanza_type="unsubscribe"))
result.append(Presence(from_jid=_account.jid,
to_jid=user_jid,
stanza_type="unsubscribed"))
_account.destroySelf()
model.db_disconnect()
return result
def populate_account(self, _account, lang_class, x_data,
new_account, first_account, from_jid=None):
"""Populate given account"""

View File

@@ -144,16 +144,6 @@ class AccountPresenceUnsubscribeHandler(Handler):
def handle(self, stanza, lang_class, data):
"""Handle \"unsubscribe\" iq sent to account JID"""
result = []
from_jid = stanza.get_from()
_account = data
model.db_connect()
result.append(Presence(from_jid=_account.jid,
to_jid=from_jid,
stanza_type="unsubscribe"))
result.append(Presence(from_jid=_account.jid,
to_jid=from_jid,
stanza_type="unsubscribed"))
_account.destroySelf()
model.db_disconnect()
return result
return self.component.account_manager.remove_account(_account, from_jid)

View File

@@ -128,7 +128,7 @@ class JCLCommandManager_TestCase(unittest.TestCase):
self.assertNotEquals(user_jid_field, None)
self.assertEquals(len(user_jid_field), 1)
self.assertEquals(user_jid_field[0].prop("var"), "user_jid")
self.assertEquals(user_jid_field[0].prop("type"), "text-single")
self.assertEquals(user_jid_field[0].prop("type"), "jid-single")
self.assertEquals(user_jid_field[0].prop("label"), Lang.en.field_user_jid)
# Second step
@@ -144,7 +144,7 @@ class JCLCommandManager_TestCase(unittest.TestCase):
submit_form.add_field(field_type="list-single",
name="account_type",
value="Example")
submit_form.add_field(field_type="text-single",
submit_form.add_field(field_type="jid-single",
name="user_jid",
value="user2@test.com")
submit_form.as_xml(command_node)
@@ -177,7 +177,6 @@ class JCLCommandManager_TestCase(unittest.TestCase):
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
command_node.setProp("action", "finish")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "complete")
submit_form = Form(xmlnode_or_type="submit")
@@ -259,15 +258,406 @@ class JCLCommandManager_TestCase(unittest.TestCase):
self.assertEquals(presence_account.get_node().prop("type"),
"subscribe")
def test_execute_add_user_prev(self):
self.comp.account_manager.account_classes = (ExampleAccount,
Example2Account)
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#add-user",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertNotEquals(xml_command.prop("sessionid"), None)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 1)
self.assertEquals(actions[0].prop("execute"), "next")
self.assertEquals(actions[0].children.name, "next")
options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(options), 2)
self.assertEquals(options[0].prop("label"), "Example")
self.assertEquals(options[0].children.name, "value")
self.assertEquals(options[0].children.content, "Example")
self.assertEquals(options[1].prop("label"), "Example2")
self.assertEquals(options[1].children.name, "value")
self.assertEquals(options[1].children.content, "Example2")
user_jid_field = result[0].xpath_eval("c:command/data:x/data:field[2]",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertNotEquals(user_jid_field, None)
self.assertEquals(len(user_jid_field), 1)
self.assertEquals(user_jid_field[0].prop("var"), "user_jid")
self.assertEquals(user_jid_field[0].prop("type"), "jid-single")
self.assertEquals(user_jid_field[0].prop("label"), Lang.en.field_user_jid)
# Second step
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
session_id = xml_command.prop("sessionid")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "next")
submit_form = Form(xmlnode_or_type="submit")
submit_form.add_field(field_type="list-single",
name="account_type",
value="Example")
submit_form.add_field(field_type="jid-single",
name="user_jid",
value="user2@test.com")
submit_form.as_xml(command_node)
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#add-user",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertEquals(xml_command.prop("sessionid"), session_id)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 1)
self.assertEquals(actions[0].prop("execute"), "complete")
self.assertEquals(actions[0].children.name, "prev")
self.assertEquals(actions[0].children.next.name, "complete")
fields = result[0].xpath_eval("c:command/data:x/data:field",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(fields), 6)
context_session = self.command_manager.sessions[session_id][1]
self.assertEquals(context_session["account_type"], "Example")
self.assertEquals(context_session["user_jid"], "user2@test.com")
# First step again
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "prev")
submit_form = Form(xmlnode_or_type="submit")
submit_form.add_field(field_type="text-single",
name="name",
value="account1")
submit_form.add_field(field_type="text-single",
name="login",
value="login1")
submit_form.add_field(field_type="text-private",
name="password",
value="pass1")
submit_form.add_field(field_type="boolean",
name="store_password",
value="1")
submit_form.add_field(field_type="list-single",
name="test_enum",
value="choice2")
submit_form.add_field(field_type="text-single",
name="test_int",
value="42")
submit_form.as_xml(command_node)
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#add-user",
"prev")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertEquals(xml_command.prop("sessionid"), session_id)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 1)
self.assertEquals(actions[0].prop("execute"), "next")
self.assertEquals(actions[0].children.name, "next")
options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(options), 2)
self.assertEquals(options[0].prop("label"), "Example")
self.assertEquals(options[0].children.name, "value")
self.assertEquals(options[0].children.content, "Example")
self.assertEquals(options[1].prop("label"), "Example2")
self.assertEquals(options[1].children.name, "value")
self.assertEquals(options[1].children.content, "Example2")
user_jid_field = result[0].xpath_eval("c:command/data:x/data:field[2]",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertNotEquals(user_jid_field, None)
self.assertEquals(len(user_jid_field), 1)
self.assertEquals(user_jid_field[0].prop("var"), "user_jid")
self.assertEquals(user_jid_field[0].prop("type"), "jid-single")
self.assertEquals(user_jid_field[0].prop("label"), Lang.en.field_user_jid)
def test_execute_add_user_cancel(self):
self.comp.account_manager.account_classes = (ExampleAccount,
Example2Account)
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#add-user",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertNotEquals(xml_command.prop("sessionid"), None)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 1)
self.assertEquals(actions[0].prop("execute"), "next")
self.assertEquals(actions[0].children.name, "next")
options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(options), 2)
self.assertEquals(options[0].prop("label"), "Example")
self.assertEquals(options[0].children.name, "value")
self.assertEquals(options[0].children.content, "Example")
self.assertEquals(options[1].prop("label"), "Example2")
self.assertEquals(options[1].children.name, "value")
self.assertEquals(options[1].children.content, "Example2")
user_jid_field = result[0].xpath_eval("c:command/data:x/data:field[2]",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertNotEquals(user_jid_field, None)
self.assertEquals(len(user_jid_field), 1)
self.assertEquals(user_jid_field[0].prop("var"), "user_jid")
self.assertEquals(user_jid_field[0].prop("type"), "jid-single")
self.assertEquals(user_jid_field[0].prop("label"), Lang.en.field_user_jid)
# Second step
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
session_id = xml_command.prop("sessionid")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "cancel")
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#add-user",
"cancel")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "canceled")
self.assertEquals(xml_command.prop("sessionid"), session_id)
self.assertEquals(xml_command.children, None)
def test_execute_delete_user(self):
self.comp.account_manager.account_classes = (ExampleAccount,
Example2Account)
model.db_connect()
account11 = ExampleAccount(user_jid="test1@test.com",
name="account11",
jid="account11@jcl.test.com")
account12 = Example2Account(user_jid="test1@test.com",
name="account12",
jid="account12@jcl.test.com")
account21 = ExampleAccount(user_jid="test2@test.com",
name="account21",
jid="account21@jcl.test.com")
account22 = ExampleAccount(user_jid="test2@test.com",
name="account11",
jid="account11@jcl.test.com")
account31 = ExampleAccount(user_jid="test3@test.com",
name="account31",
jid="account31@jcl.test.com")
account32 = Example2Account(user_jid="test3@test.com",
name="account32",
jid="account32@jcl.test.com")
model.db_disconnect()
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#delete-user")
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#delete-user",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertNotEquals(xml_command.prop("sessionid"), None)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 1)
self.assertEquals(actions[0].prop("execute"), "next")
self.assertEquals(actions[0].children.name, "next")
user_jid_field = result[0].xpath_eval("c:command/data:x/data:field[1]",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertNotEquals(user_jid_field, None)
self.assertEquals(len(user_jid_field), 1)
self.assertEquals(user_jid_field[0].prop("var"), "user_jids")
self.assertEquals(user_jid_field[0].prop("type"), "jid-multi")
self.assertEquals(user_jid_field[0].prop("label"), Lang.en.field_user_jid)
# Second step
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#delete-user")
session_id = xml_command.prop("sessionid")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "next")
submit_form = Form(xmlnode_or_type="submit")
submit_form.add_field(field_type="jid-multi",
name="user_jids",
values=["test1@test.com", "test2@test.com"])
submit_form.as_xml(command_node)
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#delete-user",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertEquals(xml_command.prop("sessionid"), session_id)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 1)
self.assertEquals(actions[0].prop("execute"), "complete")
self.assertEquals(actions[0].children.name, "prev")
self.assertEquals(actions[0].children.next.name, "complete")
fields = result[0].xpath_eval("c:command/data:x/data:field",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(fields), 1)
self.assertEquals(fields[0].prop("var"), "account_names")
self.assertEquals(fields[0].prop("type"), "list-multi")
self.assertEquals(fields[0].prop("label"), "Account")
options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(options), 4)
self.assertEquals(options[0].prop("label"),
"account11 (Example) (test1@test.com)")
self.assertEquals(options[0].children.name, "value")
self.assertEquals(options[0].children.content,
"account11/test1@test.com")
self.assertEquals(options[1].prop("label"),
"account21 (Example) (test2@test.com)")
self.assertEquals(options[1].children.name, "value")
self.assertEquals(options[1].children.content,
"account21/test2@test.com")
self.assertEquals(options[2].prop("label"),
"account11 (Example) (test2@test.com)")
self.assertEquals(options[2].children.name, "value")
self.assertEquals(options[2].children.content,
"account11/test2@test.com")
self.assertEquals(options[3].prop("label"),
"account12 (Example2) (test1@test.com)")
self.assertEquals(options[3].children.name, "value")
self.assertEquals(options[3].children.content,
"account12/test1@test.com")
context_session = self.command_manager.sessions[session_id][1]
self.assertEquals(context_session["user_jids"],
["test1@test.com", "test2@test.com"])
# Third step
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node", "http://jabber.org/protocol/admin#delete-user")
command_node.setProp("sessionid", session_id)
command_node.setProp("action", "complete")
submit_form = Form(xmlnode_or_type="submit")
submit_form.add_field(field_type="list-multi",
name="account_names",
values=["account11/test1@test.com",
"account11/test2@test.com"])
submit_form.as_xml(command_node)
result = self.command_manager.apply_command_action(info_query,
"http://jabber.org/protocol/admin#delete-user",
"execute")
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "completed")
self.assertEquals(xml_command.prop("sessionid"), session_id)
actions = result[0].xpath_eval("c:command/c:actions",
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(actions), 0)
self.assertEquals(context_session["account_names"],
["account11/test1@test.com",
"account11/test2@test.com"])
test1_accounts = account.get_accounts("test1@test.com")
count = 0
for test1_account in test1_accounts:
count += 1
self.assertEquals(test1_account.name, "account12")
self.assertEquals(count, 1)
test2_accounts = account.get_accounts("test2@test.com")
count = 0
for test2_account in test2_accounts:
count += 1
self.assertEquals(test2_account.name, "account21")
self.assertEquals(count, 1)
test3_accounts = account.get_accounts("test3@test.com")
count = 0
for test3_account in test3_accounts:
count += 1
self.assertEquals(count, 2)
stanza_sent = result
self.assertEquals(len(stanza_sent), 5)
iq_result = stanza_sent[0]
self.assertTrue(isinstance(iq_result, Iq))
self.assertEquals(iq_result.get_node().prop("type"), "result")
self.assertEquals(iq_result.get_from(), "jcl.test.com")
self.assertEquals(iq_result.get_to(), "user1@test.com")
presence_component = stanza_sent[1]
self.assertTrue(isinstance(presence_component, Presence))
self.assertEquals(presence_component.get_from(), "account11@jcl.test.com")
self.assertEquals(presence_component.get_to(), "test1@test.com")
self.assertEquals(presence_component.get_node().prop("type"),
"unsubscribe")
presence_component = stanza_sent[2]
self.assertTrue(isinstance(presence_component, Presence))
self.assertEquals(presence_component.get_from(), "account11@jcl.test.com")
self.assertEquals(presence_component.get_to(), "test1@test.com")
self.assertEquals(presence_component.get_node().prop("type"),
"unsubscribed")
presence_component = stanza_sent[3]
self.assertTrue(isinstance(presence_component, Presence))
self.assertEquals(presence_component.get_from(), "account11@jcl.test.com")
self.assertEquals(presence_component.get_to(), "test2@test.com")
self.assertEquals(presence_component.get_node().prop("type"),
"unsubscribe")
presence_component = stanza_sent[4]
self.assertTrue(isinstance(presence_component, Presence))
self.assertEquals(presence_component.get_from(), "account11@jcl.test.com")
self.assertEquals(presence_component.get_to(), "test2@test.com")
self.assertEquals(presence_component.get_node().prop("type"),
"unsubscribed")
# def test_execute_delete_user(self):
# #TODO : implement command
# info_query = Iq(stanza_type="set",
# from_jid="user1@test.com",
# to_jid="jcl.test.com")
# result = self.command_manager.execute_add_user(info_query)
# self.assertNotEquals(result, None)
# self.assertEquals(len(result), 1)
# def test_execute_disable_user(self):
# #TODO : implement command

View File

@@ -313,14 +313,14 @@ class JCLComponent_TestCase(unittest.TestCase):
self.max_tick_count = 1
self.comp.handle_tick = self.__handle_tick_test_time_handler
model.db_connect()
account11 = Account(user_jid = "test1@test.com", \
name = "account11", \
account11 = Account(user_jid="test1@test.com",
name="account11",
jid="account11@jcl.test.com")
account12 = Account(user_jid = "test1@test.com", \
name = "account12", \
account12 = Account(user_jid="test1@test.com",
name="account12",
jid="account12@jcl.test.com")
account2 = Account(user_jid = "test2@test.com", \
name = "account2", \
account2 = Account(user_jid="test2@test.com",
name="account2",
jid="account2@jcl.test.com")
model.db_disconnect()
self.comp.run()
@@ -2519,13 +2519,11 @@ class JCLComponent_TestCase(unittest.TestCase):
{"c": "http://jabber.org/protocol/commands"})
self.assertEquals(len(command_result), 1)
self.assertEquals(command_result[0].prop("status"), "completed")
# TODO : test sessionid prop, usage (cf XEP) ?
items = result[0].xpath_eval("c:command/data:x/data:item",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(items), 2)
class Handler_TestCase(unittest.TestCase):
def setUp(self):
self.handler = Handler(None)

View File

@@ -113,6 +113,7 @@ class Lang:
get_gateway_prompt = u"Email address"
command_list = u"List accounts"
command_add_user = u"Create new account"
select_account_type = u"Select account type"
class fr:
@@ -159,6 +160,7 @@ class Lang:
get_gateway_prompt = u"Adresse email"
command_list = u"Liste les comptes"
command_add_user = u"Créer un compte"
select_account_type = u"Selectionner le type de comptes"
class nl:

View File

@@ -64,7 +64,6 @@ class Account(InheritableSQLObject):
user_jid = StringCol()
name = StringCol()
jid = StringCol()
## Not yet used first_check = BoolCol(default = True)
__status = StringCol(default=OFFLINE, dbName="status")
in_error = BoolCol(default=False)
legacy_jids = MultipleJoin('LegacyJID')

View File

@@ -117,6 +117,7 @@ class Language_TestCase(unittest.TestCase):
self.assertNotEquals(self.lang_class.get_gateway_prompt, None)
self.assertNotEquals(self.lang_class.command_list, None)
self.assertNotEquals(self.lang_class.command_add_user, None)
self.assertNotEquals(self.lang_class.select_account_type, None)
class Language_fr_TestCase(Language_TestCase):