Handle method that can be called for all ad-hoc command steps
darcs-hash:20080306080857-86b55-e8b987978e8b1f40b11df308e618bfba7a6ece94.gz
This commit is contained in:
@@ -229,8 +229,18 @@ class CommandManager(object):
|
|||||||
else:
|
else:
|
||||||
self.sessions[session_id] = update_step_func(session_id)
|
self.sessions[session_id] = update_step_func(session_id)
|
||||||
step = self.sessions[session_id][0]
|
step = self.sessions[session_id][0]
|
||||||
step_method = "execute_" + short_node + "_" + str(step)
|
|
||||||
self.parse_form(info_query, session_id)
|
self.parse_form(info_query, session_id)
|
||||||
|
step_method = "execute_" + short_node + "_" + str(step)
|
||||||
|
if hasattr(self, step_method):
|
||||||
|
(form, result) = getattr(self, step_method)(\
|
||||||
|
info_query,
|
||||||
|
self.sessions[session_id][1],
|
||||||
|
command_node,
|
||||||
|
lang_class)
|
||||||
|
return [response] + result
|
||||||
|
else:
|
||||||
|
# A method that can execute all other step
|
||||||
|
step_method = "execute_" + short_node
|
||||||
if hasattr(self, step_method):
|
if hasattr(self, step_method):
|
||||||
(form, result) = getattr(self, step_method)(\
|
(form, result) = getattr(self, step_method)(\
|
||||||
info_query,
|
info_query,
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
##
|
##
|
||||||
## component.py
|
## feeder.py
|
||||||
## Login : David Rousselie <dax@happycoders.org>
|
## Login : David Rousselie <dax@happycoders.org>
|
||||||
## Started on Wed Aug 9 21:04:42 2006 David Rousselie
|
## Started on Wed Aug 9 21:04:42 2006 David Rousselie
|
||||||
## $Id$
|
## $Id$
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ import jcl.tests
|
|||||||
from jcl.lang import Lang
|
from jcl.lang import Lang
|
||||||
from jcl.jabber.component import JCLComponent
|
from jcl.jabber.component import JCLComponent
|
||||||
import jcl.jabber.command as command
|
import jcl.jabber.command as command
|
||||||
from jcl.jabber.command import FieldNoType, JCLCommandManager
|
from jcl.jabber.command import FieldNoType, CommandManager, JCLCommandManager
|
||||||
import jcl.model.account as account
|
import jcl.model.account as account
|
||||||
from jcl.model.account import Account, PresenceAccount, LegacyJID, User
|
from jcl.model.account import Account, PresenceAccount, LegacyJID, User
|
||||||
from jcl.model.tests.account import ExampleAccount, Example2Account
|
from jcl.model.tests.account import ExampleAccount, Example2Account
|
||||||
@@ -63,6 +63,22 @@ class MockComponent(JCLComponent):
|
|||||||
def get_admins(self):
|
def get_admins(self):
|
||||||
return ["admin@test.com"]
|
return ["admin@test.com"]
|
||||||
|
|
||||||
|
class MockCommandManager(CommandManager):
|
||||||
|
""" """
|
||||||
|
def __init__ (self):
|
||||||
|
""" """
|
||||||
|
CommandManager.__init__(self)
|
||||||
|
self.commands["command1"] = (False,
|
||||||
|
command.root_node_re)
|
||||||
|
self.component = MockComponent()
|
||||||
|
self.command1_step_1_called = False
|
||||||
|
|
||||||
|
def execute_command1_1(self, info_query, session_context,
|
||||||
|
command_node, lang_class):
|
||||||
|
""" """
|
||||||
|
self.command1_step_1_called = True
|
||||||
|
return (None, [])
|
||||||
|
|
||||||
class CommandManager_TestCase(unittest.TestCase):
|
class CommandManager_TestCase(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
command.command_manager.commands = {}
|
command.command_manager.commands = {}
|
||||||
@@ -320,18 +336,17 @@ class CommandManager_TestCase(unittest.TestCase):
|
|||||||
"feature-not-implemented")
|
"feature-not-implemented")
|
||||||
|
|
||||||
def test_multi_step_command_unknown_step(self):
|
def test_multi_step_command_unknown_step(self):
|
||||||
command.command_manager.commands["command1"] = (False,
|
command.command_manager = MockCommandManager()
|
||||||
command.root_node_re)
|
command.command_manager.sessions["session_id"] = (1, {})
|
||||||
command.command_manager.component = MockComponent()
|
|
||||||
info_query = Iq(stanza_type="set",
|
info_query = Iq(stanza_type="set",
|
||||||
from_jid="user@test.com",
|
from_jid="user@test.com",
|
||||||
to_jid="jcl.test.com")
|
to_jid="jcl.test.com")
|
||||||
command_node = info_query.set_new_content(command.COMMAND_NS,
|
command_node = info_query.set_new_content(command.COMMAND_NS,
|
||||||
"command")
|
"command")
|
||||||
command_node.setProp("node",
|
command_node.setProp("sessionid", "session_id")
|
||||||
"command1")
|
command_node.setProp("node", "command1")
|
||||||
result = command.command_manager.execute_multi_step_command(\
|
result = command.command_manager.execute_multi_step_command(\
|
||||||
info_query, "command1", None)
|
info_query, "command1", lambda session_id: (2, {}))
|
||||||
self.assertEquals(result[0].get_type(), "error")
|
self.assertEquals(result[0].get_type(), "error")
|
||||||
child = result[0].xmlnode.children
|
child = result[0].xmlnode.children
|
||||||
self.assertEquals(child.name, "command")
|
self.assertEquals(child.name, "command")
|
||||||
@@ -342,6 +357,44 @@ class CommandManager_TestCase(unittest.TestCase):
|
|||||||
self.assertEquals(child.children.name,
|
self.assertEquals(child.children.name,
|
||||||
"feature-not-implemented")
|
"feature-not-implemented")
|
||||||
|
|
||||||
|
def test_multi_step_command_first_step(self):
|
||||||
|
command.command_manager = MockCommandManager()
|
||||||
|
info_query = Iq(stanza_type="set",
|
||||||
|
from_jid="user@test.com",
|
||||||
|
to_jid="jcl.test.com")
|
||||||
|
command_node = info_query.set_new_content(command.COMMAND_NS,
|
||||||
|
"command")
|
||||||
|
command_node.setProp("node", "command1")
|
||||||
|
result = command.command_manager.execute_multi_step_command(\
|
||||||
|
info_query, "command1", None)
|
||||||
|
self.assertTrue(command.command_manager.command1_step_1_called)
|
||||||
|
|
||||||
|
def test_multi_step_command_multi_step_method(self):
|
||||||
|
"""
|
||||||
|
Test if the multi steps method is called if no specific method
|
||||||
|
is implemented
|
||||||
|
"""
|
||||||
|
command.command_manager = MockCommandManager()
|
||||||
|
command.command_manager.sessions["session_id"] = (1, {})
|
||||||
|
self.multi_step_command1_called = False
|
||||||
|
def execute_command1(info_query, session_context,
|
||||||
|
command_node, lang_class):
|
||||||
|
""" """
|
||||||
|
self.multi_step_command1_called = True
|
||||||
|
return (None, [])
|
||||||
|
|
||||||
|
command.command_manager.__dict__["execute_command1"] = execute_command1
|
||||||
|
info_query = Iq(stanza_type="set",
|
||||||
|
from_jid="user@test.com",
|
||||||
|
to_jid="jcl.test.com")
|
||||||
|
command_node = info_query.set_new_content(command.COMMAND_NS,
|
||||||
|
"command")
|
||||||
|
command_node.setProp("sessionid", "session_id")
|
||||||
|
command_node.setProp("node", "command1")
|
||||||
|
result = command.command_manager.execute_multi_step_command(\
|
||||||
|
info_query, "command1", lambda session_id: (2, {}))
|
||||||
|
self.assertTrue(self.multi_step_command1_called)
|
||||||
|
|
||||||
class JCLCommandManagerTestCase(JCLTestCase):
|
class JCLCommandManagerTestCase(JCLTestCase):
|
||||||
def setUp(self, tables=[]):
|
def setUp(self, tables=[]):
|
||||||
tables += [Account, PresenceAccount, ExampleAccount,
|
tables += [Account, PresenceAccount, ExampleAccount,
|
||||||
|
|||||||
Reference in New Issue
Block a user