Make 'add-user' ad-hoc command accessible to not admin users
darcs-hash:20071008202051-86b55-34cddc397121d970740ea4d1ec6b53794ccfb637.gz
This commit is contained in:
@@ -86,11 +86,10 @@ class CommandManager(object):
|
||||
|
||||
def list_commands(self, jid, disco_items, lang_class):
|
||||
"""Return DiscoItem for each supported commands"""
|
||||
bare_from_jid = unicode(jid.bare())
|
||||
for command_name in self.commands.keys():
|
||||
must_be_admin = self.commands[command_name]
|
||||
if not must_be_admin or \
|
||||
(must_be_admin and bare_from_jid in self.component.get_admins()):
|
||||
(must_be_admin and self.component.is_admin(jid)):
|
||||
command_desc = self.get_command_desc(command_name,
|
||||
lang_class)
|
||||
DiscoItem(disco_items,
|
||||
@@ -114,7 +113,7 @@ class CommandManager(object):
|
||||
must_be_admin = self.commands[command_name]
|
||||
if not must_be_admin or \
|
||||
(must_be_admin and
|
||||
unicode(info_query.get_from().bare()) in self.component.get_admins()):
|
||||
self.component.is_admin(info_query.get_from())):
|
||||
short_command_name = self.get_short_command_name(command_name)
|
||||
action_command_method = "apply_" + action + "_command"
|
||||
if hasattr(self, action_command_method):
|
||||
@@ -266,7 +265,7 @@ class JCLCommandManager(CommandManager):
|
||||
"""
|
||||
CommandManager.__init__(self, component, account_manager)
|
||||
self.__logger = logging.getLogger("jcl.jabber.command.JCLCommandManager")
|
||||
self.commands["http://jabber.org/protocol/admin#add-user"] = True
|
||||
self.commands["http://jabber.org/protocol/admin#add-user"] = False
|
||||
self.commands["http://jabber.org/protocol/admin#delete-user"] = True
|
||||
self.commands["http://jabber.org/protocol/admin#disable-user"] = True
|
||||
self.commands["http://jabber.org/protocol/admin#reenable-user"] = True
|
||||
@@ -469,6 +468,7 @@ class JCLCommandManager(CommandManager):
|
||||
self.component.account_manager.list_account_types(lang_class):
|
||||
field.add_option(label=type_label,
|
||||
values=[account_type])
|
||||
if self.component.is_admin(info_query.get_from()):
|
||||
result_form.add_field(name="user_jid",
|
||||
field_type="jid-single",
|
||||
label=lang_class.field_user_jid)
|
||||
@@ -479,6 +479,8 @@ class JCLCommandManager(CommandManager):
|
||||
command_node, lang_class):
|
||||
self.__logger.debug("Executing command 'add-user' step 2")
|
||||
self.add_actions(command_node, [ACTION_PREVIOUS, ACTION_COMPLETE], 1)
|
||||
if not self.component.is_admin(info_query.get_from()):
|
||||
session_context["user_jid"] = [unicode(info_query.get_from().bare())]
|
||||
user_jid = session_context["user_jid"][0]
|
||||
account_type = session_context["account_type"][0]
|
||||
account_class = self.account_manager.get_account_class(account_type)
|
||||
|
||||
@@ -608,6 +608,13 @@ class JCLComponent(Component, object):
|
||||
def set_admins(self, admins):
|
||||
self.set_config_parameter("component", "admins", ",".join(admins))
|
||||
|
||||
def is_admin(self, jid):
|
||||
if isinstance(jid, JID):
|
||||
jid_str = unicode(jid.bare())
|
||||
else:
|
||||
jid_str = unicode(jid)
|
||||
return jid_str in self.get_admins()
|
||||
|
||||
def get_welcome_message(self):
|
||||
return self.get_config_parameter("component", "welcome_message")
|
||||
|
||||
|
||||
@@ -53,13 +53,19 @@ class FieldNoType_TestCase(unittest.TestCase):
|
||||
field.complete_xml_element(fake_iq.xmlnode, None)
|
||||
self.assertFalse(fake_iq.xmlnode.hasProp("type"))
|
||||
|
||||
class MockComponent(object):
|
||||
class MockComponent(JCLComponent):
|
||||
jid = JID("jcl.test.com")
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def get_admins(self):
|
||||
return ["admin@test.com"]
|
||||
|
||||
class CommandManager_TestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
command.command_manager.commands = {}
|
||||
|
||||
def test_get_short_command_name_form_long_name(self):
|
||||
command_name = command.command_manager.get_short_command_name("http://jabber.org/protocol/admin#test-command")
|
||||
self.assertEquals(command_name, "test_command")
|
||||
@@ -81,7 +87,6 @@ class CommandManager_TestCase(unittest.TestCase):
|
||||
self.assertEquals(items[0].get_name(), "command2")
|
||||
|
||||
def test_list_commands_as_admin(self):
|
||||
command.command_manager.commands = {}
|
||||
command.command_manager.commands["command1"] = True
|
||||
command.command_manager.commands["command2"] = False
|
||||
command.command_manager.component = MockComponent()
|
||||
@@ -96,7 +101,6 @@ class CommandManager_TestCase(unittest.TestCase):
|
||||
self.assertEquals(items[1].get_name(), "command2")
|
||||
|
||||
def test_list_commands_as_admin_fulljid(self):
|
||||
command.command_manager.commands = {}
|
||||
command.command_manager.commands["command1"] = True
|
||||
command.command_manager.commands["command2"] = False
|
||||
command.command_manager.component = MockComponent()
|
||||
@@ -647,6 +651,159 @@ class JCLCommandManager_TestCase(JCLTestCase):
|
||||
self.assertEquals(presence_account.get_node().prop("type"),
|
||||
"subscribe")
|
||||
|
||||
def test_execute_add_user_not_admin(self):
|
||||
self.comp.account_manager.account_classes = (ExampleAccount,
|
||||
Example2Account)
|
||||
info_query = Iq(stanza_type="set",
|
||||
from_jid="test1@test.com",
|
||||
to_jid="jcl.test.com")
|
||||
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
|
||||
result = self.command_manager.apply_command_action(info_query,
|
||||
"http://jabber.org/protocol/admin#add-user",
|
||||
"execute")
|
||||
self.assertNotEquals(result, None)
|
||||
self.assertEquals(len(result), 1)
|
||||
xml_command = result[0].xpath_eval("c:command",
|
||||
{"c": "http://jabber.org/protocol/commands"})[0]
|
||||
self.assertEquals(xml_command.prop("status"), "executing")
|
||||
self.assertNotEquals(xml_command.prop("sessionid"), None)
|
||||
self.__check_actions(result[0], ["next"])
|
||||
x_data = result[0].xpath_eval("c:command/data:x",
|
||||
{"c": "http://jabber.org/protocol/commands",
|
||||
"data": "jabber:x:data"})
|
||||
self.assertEquals(len(x_data), 1)
|
||||
self.assertEquals(x_data[0].prop("type"), "form")
|
||||
fields = result[0].xpath_eval("c:command/data:x/data:field",
|
||||
{"c": "http://jabber.org/protocol/commands",
|
||||
"data": "jabber:x:data"})
|
||||
self.assertEquals(len(fields), 1)
|
||||
options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
|
||||
{"c": "http://jabber.org/protocol/commands",
|
||||
"data": "jabber:x:data"})
|
||||
self.assertEquals(len(options), 2)
|
||||
self.assertEquals(options[0].prop("label"), "Example")
|
||||
self.assertEquals(options[0].children.name, "value")
|
||||
self.assertEquals(options[0].children.content, "Example")
|
||||
self.assertEquals(options[1].prop("label"), "Example2")
|
||||
self.assertEquals(options[1].children.name, "value")
|
||||
self.assertEquals(options[1].children.content, "Example2")
|
||||
|
||||
# Second step
|
||||
info_query = Iq(stanza_type="set",
|
||||
from_jid="test1@test.com",
|
||||
to_jid="jcl.test.com")
|
||||
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
|
||||
session_id = xml_command.prop("sessionid")
|
||||
command_node.setProp("sessionid", session_id)
|
||||
command_node.setProp("action", "next")
|
||||
submit_form = Form(xmlnode_or_type="submit")
|
||||
submit_form.add_field(field_type="list-single",
|
||||
name="account_type",
|
||||
value="Example")
|
||||
submit_form.as_xml(command_node)
|
||||
result = self.command_manager.apply_command_action(info_query,
|
||||
"http://jabber.org/protocol/admin#add-user",
|
||||
"next")
|
||||
self.assertNotEquals(result, None)
|
||||
self.assertEquals(len(result), 1)
|
||||
xml_command = result[0].xpath_eval("c:command",
|
||||
{"c": "http://jabber.org/protocol/commands"})[0]
|
||||
self.assertEquals(xml_command.prop("status"), "executing")
|
||||
self.assertEquals(xml_command.prop("sessionid"), session_id)
|
||||
self.__check_actions(result[0], ["prev", "complete"], 1)
|
||||
x_data = result[0].xpath_eval("c:command/data:x",
|
||||
{"c": "http://jabber.org/protocol/commands",
|
||||
"data": "jabber:x:data"})
|
||||
self.assertEquals(len(x_data), 1)
|
||||
self.assertEquals(x_data[0].prop("type"), "form")
|
||||
fields = result[0].xpath_eval("c:command/data:x/data:field",
|
||||
{"c": "http://jabber.org/protocol/commands",
|
||||
"data": "jabber:x:data"})
|
||||
self.assertEquals(len(fields), 6)
|
||||
context_session = self.command_manager.sessions[session_id][1]
|
||||
self.assertEquals(context_session["account_type"], ["Example"])
|
||||
self.assertEquals(context_session["user_jid"], ["test1@test.com"])
|
||||
|
||||
# Third step
|
||||
info_query = Iq(stanza_type="set",
|
||||
from_jid="test1@test.com",
|
||||
to_jid="jcl.test.com")
|
||||
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
|
||||
command_node.setProp("sessionid", session_id)
|
||||
command_node.setProp("action", "complete")
|
||||
submit_form = Form(xmlnode_or_type="submit")
|
||||
submit_form.add_field(field_type="text-single",
|
||||
name="name",
|
||||
value="account1")
|
||||
submit_form.add_field(field_type="text-single",
|
||||
name="login",
|
||||
value="login1")
|
||||
submit_form.add_field(field_type="text-private",
|
||||
name="password",
|
||||
value="pass1")
|
||||
submit_form.add_field(field_type="boolean",
|
||||
name="store_password",
|
||||
value="1")
|
||||
submit_form.add_field(field_type="list-single",
|
||||
name="test_enum",
|
||||
value="choice2")
|
||||
submit_form.add_field(field_type="text-single",
|
||||
name="test_int",
|
||||
value="42")
|
||||
submit_form.as_xml(command_node)
|
||||
result = self.command_manager.apply_command_action(info_query,
|
||||
"http://jabber.org/protocol/admin#add-user",
|
||||
"execute")
|
||||
xml_command = result[0].xpath_eval("c:command",
|
||||
{"c": "http://jabber.org/protocol/commands"})[0]
|
||||
self.assertEquals(xml_command.prop("status"), "completed")
|
||||
self.assertEquals(xml_command.prop("sessionid"), session_id)
|
||||
self.__check_actions(result[0])
|
||||
self.assertEquals(context_session["name"], ["account1"])
|
||||
self.assertEquals(context_session["login"], ["login1"])
|
||||
self.assertEquals(context_session["password"], ["pass1"])
|
||||
self.assertEquals(context_session["store_password"], ["1"])
|
||||
self.assertEquals(context_session["test_enum"], ["choice2"])
|
||||
self.assertEquals(context_session["test_int"], ["42"])
|
||||
model.db_connect()
|
||||
_account = account.get_account("test1@test.com",
|
||||
"account1")
|
||||
self.assertNotEquals(_account, None)
|
||||
self.assertEquals(_account.user.jid, "test1@test.com")
|
||||
self.assertEquals(_account.name, "account1")
|
||||
self.assertEquals(_account.jid, "account1@jcl.test.com")
|
||||
model.db_disconnect()
|
||||
stanza_sent = result
|
||||
self.assertEquals(len(stanza_sent), 4)
|
||||
iq_result = stanza_sent[0]
|
||||
self.assertTrue(isinstance(iq_result, Iq))
|
||||
self.assertEquals(iq_result.get_node().prop("type"), "result")
|
||||
self.assertEquals(iq_result.get_from(), "jcl.test.com")
|
||||
self.assertEquals(iq_result.get_to(), "test1@test.com")
|
||||
presence_component = stanza_sent[1]
|
||||
self.assertTrue(isinstance(presence_component, Presence))
|
||||
self.assertEquals(presence_component.get_from(), "jcl.test.com")
|
||||
self.assertEquals(presence_component.get_to(), "test1@test.com")
|
||||
self.assertEquals(presence_component.get_node().prop("type"),
|
||||
"subscribe")
|
||||
message = stanza_sent[2]
|
||||
self.assertTrue(isinstance(message, Message))
|
||||
self.assertEquals(message.get_from(), "jcl.test.com")
|
||||
self.assertEquals(message.get_to(), "test1@test.com")
|
||||
self.assertEquals(message.get_subject(),
|
||||
_account.get_new_message_subject(Lang.en))
|
||||
self.assertEquals(message.get_body(),
|
||||
_account.get_new_message_body(Lang.en))
|
||||
presence_account = stanza_sent[3]
|
||||
self.assertTrue(isinstance(presence_account, Presence))
|
||||
self.assertEquals(presence_account.get_from(), "account1@jcl.test.com")
|
||||
self.assertEquals(presence_account.get_to(), "test1@test.com")
|
||||
self.assertEquals(presence_account.get_node().prop("type"),
|
||||
"subscribe")
|
||||
|
||||
def test_execute_add_user_prev(self):
|
||||
self.comp.account_manager.account_classes = (ExampleAccount,
|
||||
Example2Account)
|
||||
|
||||
Reference in New Issue
Block a user