diff --git a/src/jcl/jabber/command.py b/src/jcl/jabber/command.py index 7730eba..586d670 100644 --- a/src/jcl/jabber/command.py +++ b/src/jcl/jabber/command.py @@ -110,21 +110,25 @@ class CommandManager(object): def apply_command_action(self, info_query, command_name, action): """Apply action on command""" - must_be_admin = self.commands[command_name] - if not must_be_admin or \ - (must_be_admin and - unicode(info_query.get_from().bare()) in self.component.get_admins()): - short_command_name = self.get_short_command_name(command_name) - action_command_method = "apply_" + action + "_command" - if hasattr(self, action_command_method): - return getattr(self, action_command_method)(info_query, - short_command_name) + if self.commands.has_key(command_name): + must_be_admin = self.commands[command_name] + if not must_be_admin or \ + (must_be_admin and + unicode(info_query.get_from().bare()) in self.component.get_admins()): + short_command_name = self.get_short_command_name(command_name) + action_command_method = "apply_" + action + "_command" + if hasattr(self, action_command_method): + return getattr(self, action_command_method)(info_query, + short_command_name) + else: + return [info_query.make_error_response(\ + "feature-not-implemented")] else: return [info_query.make_error_response(\ - "feature-not-implemented")] + "forbidden")] else: return [info_query.make_error_response(\ - "forbidden")] + "feature-not-implemented")] def apply_execute_command(self, info_query, short_command_name): return self.execute_multi_step_command(\ @@ -260,8 +264,8 @@ class JCLCommandManager(CommandManager): self.commands["http://jabber.org/protocol/admin#disable-user"] = True self.commands["http://jabber.org/protocol/admin#reenable-user"] = True self.commands["http://jabber.org/protocol/admin#end-user-session"] = True - self.commands["http://jabber.org/protocol/admin#get-user-password"] = True - self.commands["http://jabber.org/protocol/admin#change-user-password"] = True + #self.commands["http://jabber.org/protocol/admin#get-user-password"] = True + #self.commands["http://jabber.org/protocol/admin#change-user-password"] = True self.commands["http://jabber.org/protocol/admin#get-user-roster"] = True self.commands["http://jabber.org/protocol/admin#get-user-lastlogin"] = True self.commands["http://jabber.org/protocol/admin#get-registered-users-num"] = True diff --git a/src/jcl/jabber/tests/command.py b/src/jcl/jabber/tests/command.py index fb20432..d5817cf 100644 --- a/src/jcl/jabber/tests/command.py +++ b/src/jcl/jabber/tests/command.py @@ -1279,215 +1279,217 @@ class JCLCommandManager_TestCase(JCLTestCase): self.assertEquals(presence_component.get_node().prop("type"), "unavailable") - def test_execute_get_user_password(self): - self.comp.account_manager.account_classes = (ExampleAccount, - Example2Account) - model.db_connect() - user1 = User(jid="test1@test.com") - user2 = User(jid="test2@test.com") - account11 = ExampleAccount(user=user1, - name="account11", - jid="account11@jcl.test.com") - account11.password = "pass1" - account12 = Example2Account(user=user1, - name="account12", - jid="account12@jcl.test.com") - account21 = ExampleAccount(user=user2, - name="account21", - jid="account21@jcl.test.com") - account22 = ExampleAccount(user=user2, - name="account11", - jid="account11@jcl.test.com") - model.db_disconnect() - info_query = Iq(stanza_type="set", - from_jid="admin@test.com", - to_jid="jcl.test.com") - command_node = info_query.set_new_content(command.COMMAND_NS, "command") - command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-password") - result = self.command_manager.apply_command_action(info_query, - "http://jabber.org/protocol/admin#get-user-password", - "execute") - self.assertNotEquals(result, None) - self.assertEquals(len(result), 1) - xml_command = result[0].xpath_eval("c:command", - {"c": "http://jabber.org/protocol/commands"})[0] - self.assertEquals(xml_command.prop("status"), "executing") - self.assertNotEquals(xml_command.prop("sessionid"), None) - self.__check_actions(result[0], ["next"]) +# disabled command +# def test_execute_get_user_password(self): +# self.comp.account_manager.account_classes = (ExampleAccount, +# Example2Account) +# model.db_connect() +# user1 = User(jid="test1@test.com") +# user2 = User(jid="test2@test.com") +# account11 = ExampleAccount(user=user1, +# name="account11", +# jid="account11@jcl.test.com") +# account11.password = "pass1" +# account12 = Example2Account(user=user1, +# name="account12", +# jid="account12@jcl.test.com") +# account21 = ExampleAccount(user=user2, +# name="account21", +# jid="account21@jcl.test.com") +# account22 = ExampleAccount(user=user2, +# name="account11", +# jid="account11@jcl.test.com") +# model.db_disconnect() +# info_query = Iq(stanza_type="set", +# from_jid="admin@test.com", +# to_jid="jcl.test.com") +# command_node = info_query.set_new_content(command.COMMAND_NS, "command") +# command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-password") +# result = self.command_manager.apply_command_action(info_query, +# "http://jabber.org/protocol/admin#get-user-password", +# "execute") +# self.assertNotEquals(result, None) +# self.assertEquals(len(result), 1) +# xml_command = result[0].xpath_eval("c:command", +# {"c": "http://jabber.org/protocol/commands"})[0] +# self.assertEquals(xml_command.prop("status"), "executing") +# self.assertNotEquals(xml_command.prop("sessionid"), None) +# self.__check_actions(result[0], ["next"]) - # Second step - info_query = Iq(stanza_type="set", - from_jid="admin@test.com", - to_jid="jcl.test.com") - command_node = info_query.set_new_content(command.COMMAND_NS, "command") - command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-password") - session_id = xml_command.prop("sessionid") - command_node.setProp("sessionid", session_id) - command_node.setProp("action", "next") - submit_form = Form(xmlnode_or_type="submit") - submit_form.add_field(field_type="jid-single", - name="user_jid", - value="test1@test.com") - submit_form.as_xml(command_node) - result = self.command_manager.apply_command_action(info_query, - "http://jabber.org/protocol/admin#get-user-password", - "execute") - self.assertNotEquals(result, None) - self.assertEquals(len(result), 1) - xml_command = result[0].xpath_eval("c:command", - {"c": "http://jabber.org/protocol/commands"})[0] - self.assertEquals(xml_command.prop("status"), "executing") - self.assertEquals(xml_command.prop("sessionid"), session_id) - self.__check_actions(result[0], ["prev", "complete"], 1) - context_session = self.command_manager.sessions[session_id][1] - self.assertEquals(context_session["user_jid"], - ["test1@test.com"]) +# # Second step +# info_query = Iq(stanza_type="set", +# from_jid="admin@test.com", +# to_jid="jcl.test.com") +# command_node = info_query.set_new_content(command.COMMAND_NS, "command") +# command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-password") +# session_id = xml_command.prop("sessionid") +# command_node.setProp("sessionid", session_id) +# command_node.setProp("action", "next") +# submit_form = Form(xmlnode_or_type="submit") +# submit_form.add_field(field_type="jid-single", +# name="user_jid", +# value="test1@test.com") +# submit_form.as_xml(command_node) +# result = self.command_manager.apply_command_action(info_query, +# "http://jabber.org/protocol/admin#get-user-password", +# "execute") +# self.assertNotEquals(result, None) +# self.assertEquals(len(result), 1) +# xml_command = result[0].xpath_eval("c:command", +# {"c": "http://jabber.org/protocol/commands"})[0] +# self.assertEquals(xml_command.prop("status"), "executing") +# self.assertEquals(xml_command.prop("sessionid"), session_id) +# self.__check_actions(result[0], ["prev", "complete"], 1) +# context_session = self.command_manager.sessions[session_id][1] +# self.assertEquals(context_session["user_jid"], +# ["test1@test.com"]) - # Third step - info_query = Iq(stanza_type="set", - from_jid="admin@test.com", - to_jid="jcl.test.com") - command_node = info_query.set_new_content(command.COMMAND_NS, "command") - command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-password") - command_node.setProp("sessionid", session_id) - command_node.setProp("action", "complete") - submit_form = Form(xmlnode_or_type="submit") - submit_form.add_field(field_type="list-single", - name="account_name", - value="account11/test1@test.com") - submit_form.as_xml(command_node) - result = self.command_manager.apply_command_action(info_query, - "http://jabber.org/protocol/admin#get-user-password", - "execute") - xml_command = result[0].xpath_eval("c:command", - {"c": "http://jabber.org/protocol/commands"})[0] - self.assertEquals(xml_command.prop("status"), "completed") - self.assertEquals(xml_command.prop("sessionid"), session_id) - self.__check_actions(result[0]) - self.assertEquals(context_session["account_name"], - ["account11/test1@test.com"]) - stanza_sent = result - self.assertEquals(len(stanza_sent), 1) - iq_result = stanza_sent[0] - self.assertTrue(isinstance(iq_result, Iq)) - self.assertEquals(iq_result.get_node().prop("type"), "result") - self.assertEquals(iq_result.get_from(), "jcl.test.com") - self.assertEquals(iq_result.get_to(), "admin@test.com") - fields = iq_result.xpath_eval("c:command/data:x/data:field", - {"c": "http://jabber.org/protocol/commands", - "data": "jabber:x:data"}) - self.assertEquals(len(fields), 3) - self.assertEquals(fields[0].prop("var"), "FORM_TYPE") - self.assertEquals(fields[0].prop("type"), "hidden") - self.assertEquals(fields[0].children.name, "value") - self.assertEquals(fields[0].children.content, - "http://jabber.org/protocol/admin") - self.assertEquals(fields[1].prop("var"), "accountjids") - self.assertEquals(fields[1].children.name, "value") - self.assertEquals(fields[1].children.content, - "test1@test.com") - self.assertEquals(fields[2].prop("var"), "password") - self.assertEquals(fields[2].children.name, "value") - self.assertEquals(fields[2].children.content, - "pass1") +# # Third step +# info_query = Iq(stanza_type="set", +# from_jid="admin@test.com", +# to_jid="jcl.test.com") +# command_node = info_query.set_new_content(command.COMMAND_NS, "command") +# command_node.setProp("node", "http://jabber.org/protocol/admin#get-user-password") +# command_node.setProp("sessionid", session_id) +# command_node.setProp("action", "complete") +# submit_form = Form(xmlnode_or_type="submit") +# submit_form.add_field(field_type="list-single", +# name="account_name", +# value="account11/test1@test.com") +# submit_form.as_xml(command_node) +# result = self.command_manager.apply_command_action(info_query, +# "http://jabber.org/protocol/admin#get-user-password", +# "execute") +# xml_command = result[0].xpath_eval("c:command", +# {"c": "http://jabber.org/protocol/commands"})[0] +# self.assertEquals(xml_command.prop("status"), "completed") +# self.assertEquals(xml_command.prop("sessionid"), session_id) +# self.__check_actions(result[0]) +# self.assertEquals(context_session["account_name"], +# ["account11/test1@test.com"]) +# stanza_sent = result +# self.assertEquals(len(stanza_sent), 1) +# iq_result = stanza_sent[0] +# self.assertTrue(isinstance(iq_result, Iq)) +# self.assertEquals(iq_result.get_node().prop("type"), "result") +# self.assertEquals(iq_result.get_from(), "jcl.test.com") +# self.assertEquals(iq_result.get_to(), "admin@test.com") +# fields = iq_result.xpath_eval("c:command/data:x/data:field", +# {"c": "http://jabber.org/protocol/commands", +# "data": "jabber:x:data"}) +# self.assertEquals(len(fields), 3) +# self.assertEquals(fields[0].prop("var"), "FORM_TYPE") +# self.assertEquals(fields[0].prop("type"), "hidden") +# self.assertEquals(fields[0].children.name, "value") +# self.assertEquals(fields[0].children.content, +# "http://jabber.org/protocol/admin") +# self.assertEquals(fields[1].prop("var"), "accountjids") +# self.assertEquals(fields[1].children.name, "value") +# self.assertEquals(fields[1].children.content, +# "test1@test.com") +# self.assertEquals(fields[2].prop("var"), "password") +# self.assertEquals(fields[2].children.name, "value") +# self.assertEquals(fields[2].children.content, +# "pass1") - def test_execute_change_user_password(self): - self.comp.account_manager.account_classes = (ExampleAccount, - Example2Account) - model.db_connect() - user1 = User(jid="test1@test.com") - account11 = ExampleAccount(user=user1, - name="account11", - jid="account11@jcl.test.com") - account11.password = "pass1" - account12 = Example2Account(user=user1, - name="account12", - jid="account12@jcl.test.com") - user2 = User(jid="test2@test.com") - account21 = ExampleAccount(user=user2, - name="account21", - jid="account21@jcl.test.com") - account22 = ExampleAccount(user=user2, - name="account11", - jid="account11@jcl.test.com") - model.db_disconnect() - info_query = Iq(stanza_type="set", - from_jid="admin@test.com", - to_jid="jcl.test.com") - command_node = info_query.set_new_content(command.COMMAND_NS, "command") - command_node.setProp("node", "http://jabber.org/protocol/admin#change-user-password") - result = self.command_manager.apply_command_action(info_query, - "http://jabber.org/protocol/admin#change-user-password", - "execute") - self.assertNotEquals(result, None) - self.assertEquals(len(result), 1) - xml_command = result[0].xpath_eval("c:command", - {"c": "http://jabber.org/protocol/commands"})[0] - self.assertEquals(xml_command.prop("status"), "executing") - self.assertNotEquals(xml_command.prop("sessionid"), None) - self.__check_actions(result[0], ["next"]) +# disabled command +# def test_execute_change_user_password(self): +# self.comp.account_manager.account_classes = (ExampleAccount, +# Example2Account) +# model.db_connect() +# user1 = User(jid="test1@test.com") +# account11 = ExampleAccount(user=user1, +# name="account11", +# jid="account11@jcl.test.com") +# account11.password = "pass1" +# account12 = Example2Account(user=user1, +# name="account12", +# jid="account12@jcl.test.com") +# user2 = User(jid="test2@test.com") +# account21 = ExampleAccount(user=user2, +# name="account21", +# jid="account21@jcl.test.com") +# account22 = ExampleAccount(user=user2, +# name="account11", +# jid="account11@jcl.test.com") +# model.db_disconnect() +# info_query = Iq(stanza_type="set", +# from_jid="admin@test.com", +# to_jid="jcl.test.com") +# command_node = info_query.set_new_content(command.COMMAND_NS, "command") +# command_node.setProp("node", "http://jabber.org/protocol/admin#change-user-password") +# result = self.command_manager.apply_command_action(info_query, +# "http://jabber.org/protocol/admin#change-user-password", +# "execute") +# self.assertNotEquals(result, None) +# self.assertEquals(len(result), 1) +# xml_command = result[0].xpath_eval("c:command", +# {"c": "http://jabber.org/protocol/commands"})[0] +# self.assertEquals(xml_command.prop("status"), "executing") +# self.assertNotEquals(xml_command.prop("sessionid"), None) +# self.__check_actions(result[0], ["next"]) - # Second step - info_query = Iq(stanza_type="set", - from_jid="admin@test.com", - to_jid="jcl.test.com") - command_node = info_query.set_new_content(command.COMMAND_NS, "command") - command_node.setProp("node", "http://jabber.org/protocol/admin#change-user-password") - session_id = xml_command.prop("sessionid") - command_node.setProp("sessionid", session_id) - command_node.setProp("action", "next") - submit_form = Form(xmlnode_or_type="submit") - submit_form.add_field(field_type="jid-single", - name="user_jid", - value="test1@test.com") - submit_form.as_xml(command_node) - result = self.command_manager.apply_command_action(info_query, - "http://jabber.org/protocol/admin#change-user-password", - "execute") - self.assertNotEquals(result, None) - self.assertEquals(len(result), 1) - xml_command = result[0].xpath_eval("c:command", - {"c": "http://jabber.org/protocol/commands"})[0] - self.assertEquals(xml_command.prop("status"), "executing") - self.assertEquals(xml_command.prop("sessionid"), session_id) - self.__check_actions(result[0], ["prev", "complete"], 1) - context_session = self.command_manager.sessions[session_id][1] - self.assertEquals(context_session["user_jid"], - ["test1@test.com"]) - fields = result[0].xpath_eval("c:command/data:x/data:field", - {"c": "http://jabber.org/protocol/commands", - "data": "jabber:x:data"}) - self.assertEquals(len(fields), 2) +# # Second step +# info_query = Iq(stanza_type="set", +# from_jid="admin@test.com", +# to_jid="jcl.test.com") +# command_node = info_query.set_new_content(command.COMMAND_NS, "command") +# command_node.setProp("node", "http://jabber.org/protocol/admin#change-user-password") +# session_id = xml_command.prop("sessionid") +# command_node.setProp("sessionid", session_id) +# command_node.setProp("action", "next") +# submit_form = Form(xmlnode_or_type="submit") +# submit_form.add_field(field_type="jid-single", +# name="user_jid", +# value="test1@test.com") +# submit_form.as_xml(command_node) +# result = self.command_manager.apply_command_action(info_query, +# "http://jabber.org/protocol/admin#change-user-password", +# "execute") +# self.assertNotEquals(result, None) +# self.assertEquals(len(result), 1) +# xml_command = result[0].xpath_eval("c:command", +# {"c": "http://jabber.org/protocol/commands"})[0] +# self.assertEquals(xml_command.prop("status"), "executing") +# self.assertEquals(xml_command.prop("sessionid"), session_id) +# self.__check_actions(result[0], ["prev", "complete"], 1) +# context_session = self.command_manager.sessions[session_id][1] +# self.assertEquals(context_session["user_jid"], +# ["test1@test.com"]) +# fields = result[0].xpath_eval("c:command/data:x/data:field", +# {"c": "http://jabber.org/protocol/commands", +# "data": "jabber:x:data"}) +# self.assertEquals(len(fields), 2) - # Third step - info_query = Iq(stanza_type="set", - from_jid="admin@test.com", - to_jid="jcl.test.com") - command_node = info_query.set_new_content(command.COMMAND_NS, "command") - command_node.setProp("node", "http://jabber.org/protocol/admin#change-user-password") - command_node.setProp("sessionid", session_id) - command_node.setProp("action", "complete") - submit_form = Form(xmlnode_or_type="submit") - submit_form.add_field(field_type="list-single", - name="account_name", - value="account11/test1@test.com") - submit_form.add_field(field_type="text-private", - name="password", - value="pass2") - submit_form.as_xml(command_node) - result = self.command_manager.apply_command_action(info_query, - "http://jabber.org/protocol/admin#change-user-password", - "execute") - xml_command = result[0].xpath_eval("c:command", - {"c": "http://jabber.org/protocol/commands"})[0] - self.assertEquals(xml_command.prop("status"), "completed") - self.assertEquals(xml_command.prop("sessionid"), session_id) - self.__check_actions(result[0]) - self.assertEquals(context_session["account_name"], - ["account11/test1@test.com"]) - self.assertEquals(context_session["password"], - ["pass2"]) - self.assertEquals(account11.password, "pass2") +# # Third step +# info_query = Iq(stanza_type="set", +# from_jid="admin@test.com", +# to_jid="jcl.test.com") +# command_node = info_query.set_new_content(command.COMMAND_NS, "command") +# command_node.setProp("node", "http://jabber.org/protocol/admin#change-user-password") +# command_node.setProp("sessionid", session_id) +# command_node.setProp("action", "complete") +# submit_form = Form(xmlnode_or_type="submit") +# submit_form.add_field(field_type="list-single", +# name="account_name", +# value="account11/test1@test.com") +# submit_form.add_field(field_type="text-private", +# name="password", +# value="pass2") +# submit_form.as_xml(command_node) +# result = self.command_manager.apply_command_action(info_query, +# "http://jabber.org/protocol/admin#change-user-password", +# "execute") +# xml_command = result[0].xpath_eval("c:command", +# {"c": "http://jabber.org/protocol/commands"})[0] +# self.assertEquals(xml_command.prop("status"), "completed") +# self.assertEquals(xml_command.prop("sessionid"), session_id) +# self.__check_actions(result[0]) +# self.assertEquals(context_session["account_name"], +# ["account11/test1@test.com"]) +# self.assertEquals(context_session["password"], +# ["pass2"]) +# self.assertEquals(account11.password, "pass2") def test_execute_get_user_roster(self): self.comp.account_manager.account_classes = (ExampleAccount, diff --git a/src/jcl/jabber/tests/component.py b/src/jcl/jabber/tests/component.py index f20d21c..40c17f2 100644 --- a/src/jcl/jabber/tests/component.py +++ b/src/jcl/jabber/tests/component.py @@ -770,7 +770,7 @@ class JCLComponent_TestCase(JCLTestCase): to_jid="jcl.test.com") disco_items = self.comp.disco_get_items("http://jabber.org/protocol/commands", info_query) - self.assertEquals(len(disco_items.get_items()), 24) + self.assertEquals(len(disco_items.get_items()), 22) ########################################################################### # 'handle_get_version' tests