Handle exception CommandError sent from ad-hoc commands implementations

darcs-hash:20080307071827-86b55-450e960980325a71b51772f3d608f09ae2a2abe8.gz
This commit is contained in:
David Rousselie
2008-03-07 08:18:27 +01:00
parent 45aa2e0579
commit 6e46ad62b5
2 changed files with 68 additions and 16 deletions

View File

@@ -53,11 +53,28 @@ account_type_node_re = re.compile("^[^@/]+/.*$")
account_node_re = re.compile("^[^@/]+@[^/]+/?.*$") account_node_re = re.compile("^[^@/]+@[^/]+/?.*$")
class FieldNoType(Field): class FieldNoType(Field):
"""
Field with no 'type' attribut
"""
def complete_xml_element(self, xmlnode, doc): def complete_xml_element(self, xmlnode, doc):
"""Remove the 'type' attribut"""
result = Field.complete_xml_element(self, xmlnode, doc) result = Field.complete_xml_element(self, xmlnode, doc)
result.unsetProp("type") result.unsetProp("type")
return result return result
class CommandError(Exception):
"""
Exception use by commands to report errors
"""
def __init__ (self, error_type="service-unavailable"):
"""
CommandError constructor
"""
Exception.__init__(self)
self.type = error_type
class CommandManager(object): class CommandManager(object):
"""Handle Ad-Hoc commands""" """Handle Ad-Hoc commands"""
@@ -233,6 +250,7 @@ class CommandManager(object):
self.sessions[session_id] = update_step_func(session_id) self.sessions[session_id] = update_step_func(session_id)
step = self.sessions[session_id][0] step = self.sessions[session_id][0]
self.parse_form(info_query, session_id) self.parse_form(info_query, session_id)
try:
step_method = "execute_" + short_node + "_" + str(step) step_method = "execute_" + short_node + "_" + str(step)
if hasattr(self, step_method): if hasattr(self, step_method):
(form, result) = getattr(self, step_method)(\ (form, result) = getattr(self, step_method)(\
@@ -254,6 +272,8 @@ class CommandManager(object):
else: else:
return [info_query.make_error_response(\ return [info_query.make_error_response(\
"feature-not-implemented")] "feature-not-implemented")]
except CommandError, error:
return [info_query.make_error_response(error.type)]
def add_actions(self, command_node, actions, default_action_idx=0): def add_actions(self, command_node, actions, default_action_idx=0):
actions_node = command_node.newTextChild(None, "actions", None) actions_node = command_node.newTextChild(None, "actions", None)

View File

@@ -38,7 +38,8 @@ import jcl.tests
from jcl.lang import Lang from jcl.lang import Lang
from jcl.jabber.component import JCLComponent from jcl.jabber.component import JCLComponent
import jcl.jabber.command as command import jcl.jabber.command as command
from jcl.jabber.command import FieldNoType, CommandManager, JCLCommandManager from jcl.jabber.command import FieldNoType, CommandManager, JCLCommandManager, \
CommandError
import jcl.model.account as account import jcl.model.account as account
from jcl.model.account import Account, PresenceAccount, LegacyJID, User from jcl.model.account import Account, PresenceAccount, LegacyJID, User
from jcl.model.tests.account import ExampleAccount, Example2Account from jcl.model.tests.account import ExampleAccount, Example2Account
@@ -390,7 +391,7 @@ class CommandManager_TestCase(unittest.TestCase):
command_node = info_query.set_new_content(command.COMMAND_NS, command_node = info_query.set_new_content(command.COMMAND_NS,
"command") "command")
command_node.setProp("node", "command1") command_node.setProp("node", "command1")
result = self.command_manager.execute_multi_step_command(\ self.command_manager.execute_multi_step_command(\
info_query, "command1", None) info_query, "command1", None)
self.assertTrue(self.command_manager.command1_step_1_called) self.assertTrue(self.command_manager.command1_step_1_called)
@@ -416,10 +417,41 @@ class CommandManager_TestCase(unittest.TestCase):
"command") "command")
command_node.setProp("sessionid", "session_id") command_node.setProp("sessionid", "session_id")
command_node.setProp("node", "command1") command_node.setProp("node", "command1")
result = self.command_manager.execute_multi_step_command(\ self.command_manager.execute_multi_step_command(\
info_query, "command1", lambda session_id: (2, {})) info_query, "command1", lambda session_id: (2, {}))
self.assertTrue(self.multi_step_command1_called) self.assertTrue(self.multi_step_command1_called)
def test_multi_step_command_error_in_command(self):
"""
Test if the multi steps method catch the CommandError exception
and translate it into an IQ error
"""
self.command_manager = MockCommandManager()
def execute_command1(info_query, session_context,
command_node, lang_class):
raise CommandError("feature-not-implemented")
self.command_manager.__dict__["execute_command1_1"] = execute_command1
info_query = Iq(stanza_type="set",
from_jid="user@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS,
"command")
command_node.setProp("node", "command1")
result = self.command_manager.execute_multi_step_command(\
info_query, "command1", None)
result_iq = result[0].xmlnode
self.assertTrue(jcl.tests.is_xml_equal(\
u"<iq from='" + unicode(self.command_manager.component.jid)
+ "' to='user@test.com' type='error' "
+ "xmlns='http://pyxmpp.jabberstudio.org/xmlns/common'>"
+ "<command xmlns='http://jabber.org/protocol/commands'"
+ "node='command1' />"
+ "<error type='cancel'><feature-not-implemented "
+ "xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/></error>"
+ "</iq>",
result_iq, True))
def test_parse_form(self): def test_parse_form(self):
""" """
Check if parse_form method correctly set the session variables Check if parse_form method correctly set the session variables