From 613f87b4b924d81fd143b1acfba9be5fe77cd2fa Mon Sep 17 00:00:00 2001 From: David Rousselie Date: Mon, 8 Oct 2007 22:20:51 +0200 Subject: [PATCH] Make 'add-user' ad-hoc command accessible to not admin users darcs-hash:20071008202051-86b55-34cddc397121d970740ea4d1ec6b53794ccfb637.gz --- src/jcl/jabber/command.py | 16 ++-- src/jcl/jabber/component.py | 7 ++ src/jcl/jabber/tests/command.py | 163 +++++++++++++++++++++++++++++++- 3 files changed, 176 insertions(+), 10 deletions(-) diff --git a/src/jcl/jabber/command.py b/src/jcl/jabber/command.py index 7674342..7f945c7 100644 --- a/src/jcl/jabber/command.py +++ b/src/jcl/jabber/command.py @@ -86,11 +86,10 @@ class CommandManager(object): def list_commands(self, jid, disco_items, lang_class): """Return DiscoItem for each supported commands""" - bare_from_jid = unicode(jid.bare()) for command_name in self.commands.keys(): must_be_admin = self.commands[command_name] if not must_be_admin or \ - (must_be_admin and bare_from_jid in self.component.get_admins()): + (must_be_admin and self.component.is_admin(jid)): command_desc = self.get_command_desc(command_name, lang_class) DiscoItem(disco_items, @@ -114,7 +113,7 @@ class CommandManager(object): must_be_admin = self.commands[command_name] if not must_be_admin or \ (must_be_admin and - unicode(info_query.get_from().bare()) in self.component.get_admins()): + self.component.is_admin(info_query.get_from())): short_command_name = self.get_short_command_name(command_name) action_command_method = "apply_" + action + "_command" if hasattr(self, action_command_method): @@ -266,7 +265,7 @@ class JCLCommandManager(CommandManager): """ CommandManager.__init__(self, component, account_manager) self.__logger = logging.getLogger("jcl.jabber.command.JCLCommandManager") - self.commands["http://jabber.org/protocol/admin#add-user"] = True + self.commands["http://jabber.org/protocol/admin#add-user"] = False self.commands["http://jabber.org/protocol/admin#delete-user"] = True self.commands["http://jabber.org/protocol/admin#disable-user"] = True self.commands["http://jabber.org/protocol/admin#reenable-user"] = True @@ -469,9 +468,10 @@ class JCLCommandManager(CommandManager): self.component.account_manager.list_account_types(lang_class): field.add_option(label=type_label, values=[account_type]) - result_form.add_field(name="user_jid", - field_type="jid-single", - label=lang_class.field_user_jid) + if self.component.is_admin(info_query.get_from()): + result_form.add_field(name="user_jid", + field_type="jid-single", + label=lang_class.field_user_jid) result_form.as_xml(command_node) return (result_form, []) @@ -479,6 +479,8 @@ class JCLCommandManager(CommandManager): command_node, lang_class): self.__logger.debug("Executing command 'add-user' step 2") self.add_actions(command_node, [ACTION_PREVIOUS, ACTION_COMPLETE], 1) + if not self.component.is_admin(info_query.get_from()): + session_context["user_jid"] = [unicode(info_query.get_from().bare())] user_jid = session_context["user_jid"][0] account_type = session_context["account_type"][0] account_class = self.account_manager.get_account_class(account_type) diff --git a/src/jcl/jabber/component.py b/src/jcl/jabber/component.py index 12bf1e3..678f2bd 100644 --- a/src/jcl/jabber/component.py +++ b/src/jcl/jabber/component.py @@ -608,6 +608,13 @@ class JCLComponent(Component, object): def set_admins(self, admins): self.set_config_parameter("component", "admins", ",".join(admins)) + def is_admin(self, jid): + if isinstance(jid, JID): + jid_str = unicode(jid.bare()) + else: + jid_str = unicode(jid) + return jid_str in self.get_admins() + def get_welcome_message(self): return self.get_config_parameter("component", "welcome_message") diff --git a/src/jcl/jabber/tests/command.py b/src/jcl/jabber/tests/command.py index a6cc9a4..36db2d8 100644 --- a/src/jcl/jabber/tests/command.py +++ b/src/jcl/jabber/tests/command.py @@ -53,13 +53,19 @@ class FieldNoType_TestCase(unittest.TestCase): field.complete_xml_element(fake_iq.xmlnode, None) self.assertFalse(fake_iq.xmlnode.hasProp("type")) -class MockComponent(object): +class MockComponent(JCLComponent): jid = JID("jcl.test.com") + def __init__(self): + pass + def get_admins(self): return ["admin@test.com"] class CommandManager_TestCase(unittest.TestCase): + def setUp(self): + command.command_manager.commands = {} + def test_get_short_command_name_form_long_name(self): command_name = command.command_manager.get_short_command_name("http://jabber.org/protocol/admin#test-command") self.assertEquals(command_name, "test_command") @@ -81,7 +87,6 @@ class CommandManager_TestCase(unittest.TestCase): self.assertEquals(items[0].get_name(), "command2") def test_list_commands_as_admin(self): - command.command_manager.commands = {} command.command_manager.commands["command1"] = True command.command_manager.commands["command2"] = False command.command_manager.component = MockComponent() @@ -96,7 +101,6 @@ class CommandManager_TestCase(unittest.TestCase): self.assertEquals(items[1].get_name(), "command2") def test_list_commands_as_admin_fulljid(self): - command.command_manager.commands = {} command.command_manager.commands["command1"] = True command.command_manager.commands["command2"] = False command.command_manager.component = MockComponent() @@ -647,6 +651,159 @@ class JCLCommandManager_TestCase(JCLTestCase): self.assertEquals(presence_account.get_node().prop("type"), "subscribe") + def test_execute_add_user_not_admin(self): + self.comp.account_manager.account_classes = (ExampleAccount, + Example2Account) + info_query = Iq(stanza_type="set", + from_jid="test1@test.com", + to_jid="jcl.test.com") + command_node = info_query.set_new_content(command.COMMAND_NS, "command") + command_node.setProp("node", "http://jabber.org/protocol/admin#add-user") + result = self.command_manager.apply_command_action(info_query, + "http://jabber.org/protocol/admin#add-user", + "execute") + self.assertNotEquals(result, None) + self.assertEquals(len(result), 1) + xml_command = result[0].xpath_eval("c:command", + {"c": "http://jabber.org/protocol/commands"})[0] + self.assertEquals(xml_command.prop("status"), "executing") + self.assertNotEquals(xml_command.prop("sessionid"), None) + self.__check_actions(result[0], ["next"]) + x_data = result[0].xpath_eval("c:command/data:x", + {"c": "http://jabber.org/protocol/commands", + "data": "jabber:x:data"}) + self.assertEquals(len(x_data), 1) + self.assertEquals(x_data[0].prop("type"), "form") + fields = result[0].xpath_eval("c:command/data:x/data:field", + {"c": "http://jabber.org/protocol/commands", + "data": "jabber:x:data"}) + self.assertEquals(len(fields), 1) + options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option", + {"c": "http://jabber.org/protocol/commands", + "data": "jabber:x:data"}) + self.assertEquals(len(options), 2) + self.assertEquals(options[0].prop("label"), "Example") + self.assertEquals(options[0].children.name, "value") + self.assertEquals(options[0].children.content, "Example") + self.assertEquals(options[1].prop("label"), "Example2") + self.assertEquals(options[1].children.name, "value") + self.assertEquals(options[1].children.content, "Example2") + + # Second step + info_query = Iq(stanza_type="set", + from_jid="test1@test.com", + to_jid="jcl.test.com") + command_node = info_query.set_new_content(command.COMMAND_NS, "command") + command_node.setProp("node", "http://jabber.org/protocol/admin#add-user") + session_id = xml_command.prop("sessionid") + command_node.setProp("sessionid", session_id) + command_node.setProp("action", "next") + submit_form = Form(xmlnode_or_type="submit") + submit_form.add_field(field_type="list-single", + name="account_type", + value="Example") + submit_form.as_xml(command_node) + result = self.command_manager.apply_command_action(info_query, + "http://jabber.org/protocol/admin#add-user", + "next") + self.assertNotEquals(result, None) + self.assertEquals(len(result), 1) + xml_command = result[0].xpath_eval("c:command", + {"c": "http://jabber.org/protocol/commands"})[0] + self.assertEquals(xml_command.prop("status"), "executing") + self.assertEquals(xml_command.prop("sessionid"), session_id) + self.__check_actions(result[0], ["prev", "complete"], 1) + x_data = result[0].xpath_eval("c:command/data:x", + {"c": "http://jabber.org/protocol/commands", + "data": "jabber:x:data"}) + self.assertEquals(len(x_data), 1) + self.assertEquals(x_data[0].prop("type"), "form") + fields = result[0].xpath_eval("c:command/data:x/data:field", + {"c": "http://jabber.org/protocol/commands", + "data": "jabber:x:data"}) + self.assertEquals(len(fields), 6) + context_session = self.command_manager.sessions[session_id][1] + self.assertEquals(context_session["account_type"], ["Example"]) + self.assertEquals(context_session["user_jid"], ["test1@test.com"]) + + # Third step + info_query = Iq(stanza_type="set", + from_jid="test1@test.com", + to_jid="jcl.test.com") + command_node = info_query.set_new_content(command.COMMAND_NS, "command") + command_node.setProp("node", "http://jabber.org/protocol/admin#add-user") + command_node.setProp("sessionid", session_id) + command_node.setProp("action", "complete") + submit_form = Form(xmlnode_or_type="submit") + submit_form.add_field(field_type="text-single", + name="name", + value="account1") + submit_form.add_field(field_type="text-single", + name="login", + value="login1") + submit_form.add_field(field_type="text-private", + name="password", + value="pass1") + submit_form.add_field(field_type="boolean", + name="store_password", + value="1") + submit_form.add_field(field_type="list-single", + name="test_enum", + value="choice2") + submit_form.add_field(field_type="text-single", + name="test_int", + value="42") + submit_form.as_xml(command_node) + result = self.command_manager.apply_command_action(info_query, + "http://jabber.org/protocol/admin#add-user", + "execute") + xml_command = result[0].xpath_eval("c:command", + {"c": "http://jabber.org/protocol/commands"})[0] + self.assertEquals(xml_command.prop("status"), "completed") + self.assertEquals(xml_command.prop("sessionid"), session_id) + self.__check_actions(result[0]) + self.assertEquals(context_session["name"], ["account1"]) + self.assertEquals(context_session["login"], ["login1"]) + self.assertEquals(context_session["password"], ["pass1"]) + self.assertEquals(context_session["store_password"], ["1"]) + self.assertEquals(context_session["test_enum"], ["choice2"]) + self.assertEquals(context_session["test_int"], ["42"]) + model.db_connect() + _account = account.get_account("test1@test.com", + "account1") + self.assertNotEquals(_account, None) + self.assertEquals(_account.user.jid, "test1@test.com") + self.assertEquals(_account.name, "account1") + self.assertEquals(_account.jid, "account1@jcl.test.com") + model.db_disconnect() + stanza_sent = result + self.assertEquals(len(stanza_sent), 4) + iq_result = stanza_sent[0] + self.assertTrue(isinstance(iq_result, Iq)) + self.assertEquals(iq_result.get_node().prop("type"), "result") + self.assertEquals(iq_result.get_from(), "jcl.test.com") + self.assertEquals(iq_result.get_to(), "test1@test.com") + presence_component = stanza_sent[1] + self.assertTrue(isinstance(presence_component, Presence)) + self.assertEquals(presence_component.get_from(), "jcl.test.com") + self.assertEquals(presence_component.get_to(), "test1@test.com") + self.assertEquals(presence_component.get_node().prop("type"), + "subscribe") + message = stanza_sent[2] + self.assertTrue(isinstance(message, Message)) + self.assertEquals(message.get_from(), "jcl.test.com") + self.assertEquals(message.get_to(), "test1@test.com") + self.assertEquals(message.get_subject(), + _account.get_new_message_subject(Lang.en)) + self.assertEquals(message.get_body(), + _account.get_new_message_body(Lang.en)) + presence_account = stanza_sent[3] + self.assertTrue(isinstance(presence_account, Presence)) + self.assertEquals(presence_account.get_from(), "account1@jcl.test.com") + self.assertEquals(presence_account.get_to(), "test1@test.com") + self.assertEquals(presence_account.get_node().prop("type"), + "subscribe") + def test_execute_add_user_prev(self): self.comp.account_manager.account_classes = (ExampleAccount, Example2Account)