'announce' ad-hoc command implementation

darcs-hash:20070813201612-86b55-321928f30ee496b17cdd5ba639fff73dd7c5156f.gz
This commit is contained in:
David Rousselie
2007-08-13 22:16:12 +02:00
parent 14046323d6
commit 70f66e6e8e
3 changed files with 120 additions and 41 deletions

View File

@@ -28,6 +28,7 @@ import logging
from pyxmpp.jid import JID from pyxmpp.jid import JID
from pyxmpp.jabber.disco import DiscoInfo, DiscoItems, DiscoItem, DiscoIdentity from pyxmpp.jabber.disco import DiscoInfo, DiscoItems, DiscoItem, DiscoIdentity
from pyxmpp.jabber.dataforms import Form, Field from pyxmpp.jabber.dataforms import Form, Field
from pyxmpp.message import Message
from jcl.jabber.disco import DiscoHandler, RootDiscoGetInfoHandler from jcl.jabber.disco import DiscoHandler, RootDiscoGetInfoHandler
from jcl.model import account from jcl.model import account
@@ -742,14 +743,35 @@ class JCLCommandManager(CommandManager):
limit=limit, limit=limit,
filter=(Account.q._status != account.OFFLINE)) filter=(Account.q._status != account.OFFLINE))
def execute_get_active_users(self, info_query): def execute_announce_1(self, info_query, session_context,
return [] command_node, lang_class):
self.add_actions(command_node, [ACTION_NEXT])
result_form = Form(xmlnode_or_type="result",
title="TODO",
instructions="TODO")
result_form.add_field(field_type="hidden",
name="FORM_TYPE",
value="http://jabber.org/protocol/admin")
result_form.add_field(name="announcement",
field_type="text-multi",
label="TODO",
required=True)
result_form.as_xml(command_node)
return (result_form, [])
def execute_get_idle_users(self, info_query): def execute_announce_2(self, info_query, session_context,
return [] command_node, lang_class):
self.__logger.debug("Executing command 'announce' step 2")
def execute_announce(self, info_query): announcement = session_context["announcement"][0]
return [] accounts = account.get_all_user_jids(\
filter=(Account.q._status != account.OFFLINE))
result = []
for _account in accounts:
result.append(Message(from_jid=self.component.jid,
to_jid=_account.user_jid,
body=announcement))
command_node.setProp("status", STATUS_COMPLETED)
return (None, result)
def execute_set_motd(self, info_query): def execute_set_motd(self, info_query):
return [] return []

View File

@@ -2128,41 +2128,86 @@ class JCLCommandManager_TestCase(JCLTestCase):
self.assertEquals(values[24].content, self.assertEquals(values[24].content,
"test1@test.com (account1112 ExampleAccount)") "test1@test.com (account1112 ExampleAccount)")
# def test_execute_get_online_users(self): def test_execute_announce(self):
# #TODO : implement command self.comp.account_manager.account_classes = (ExampleAccount,
# info_query = Iq(stanza_type="set", Example2Account)
# from_jid="user1@test.com", model.db_connect()
# to_jid="jcl.test.com") account11 = ExampleAccount(user_jid="test1@test.com",
# result = self.command_manager.execute_add_user(info_query) name="account11",
# self.assertNotEquals(result, None) jid="account11@jcl.test.com")
# self.assertEquals(len(result), 1) account11.status = account.ONLINE
account12 = Example2Account(user_jid="test1@test.com",
name="account12",
jid="account12@jcl.test.com")
account12.status = "away"
account21 = ExampleAccount(user_jid="test2@test.com",
name="account21",
jid="account21@jcl.test.com")
account22 = ExampleAccount(user_jid="test2@test.com",
name="account11",
jid="account11@jcl.test.com")
account22.status = "xa"
model.db_disconnect()
info_query = Iq(stanza_type="set",
from_jid="user1@test.com",
to_jid="jcl.test.com")
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
command_node.setProp("node",
"http://jabber.org/protocol/admin#announce")
result = self.command_manager.apply_command_action(\
info_query,
"http://jabber.org/protocol/admin#announce",
"execute")
self.assertNotEquals(result, None)
self.assertEquals(len(result), 1)
xml_command = result[0].xpath_eval("c:command",
{"c": "http://jabber.org/protocol/commands"})[0]
self.assertEquals(xml_command.prop("status"), "executing")
self.assertNotEquals(xml_command.prop("sessionid"), None)
self.__check_actions(result[0], ["next"])
fields = result[0].xpath_eval("c:command/data:x/data:field",
{"c": "http://jabber.org/protocol/commands",
"data": "jabber:x:data"})
self.assertEquals(len(fields), 2)
self.assertEquals(fields[1].prop("var"), "announcement")
self.assertEquals(fields[1].prop("type"), "text-multi")
self.assertEquals(fields[1].children.name, "required")
# def test_execute_get_active_users(self): # Second step
# #TODO : implement command info_query = Iq(stanza_type="set",
# info_query = Iq(stanza_type="set", from_jid="user1@test.com",
# from_jid="user1@test.com", to_jid="jcl.test.com")
# to_jid="jcl.test.com") command_node = info_query.set_new_content(command.COMMAND_NS, "command")
# result = self.command_manager.execute_add_user(info_query) command_node.setProp("node",
# self.assertNotEquals(result, None) "http://jabber.org/protocol/admin#announce")
# self.assertEquals(len(result), 1) session_id = xml_command.prop("sessionid")
command_node.setProp("sessionid", session_id)
# def test_execute_get_idle_users(self): command_node.setProp("action", "next")
# #TODO : implement command submit_form = Form(xmlnode_or_type="submit")
# info_query = Iq(stanza_type="set", submit_form.add_field(field_type="text-multi",
# from_jid="user1@test.com", name="announcement",
# to_jid="jcl.test.com") value=["test announce"])
# result = self.command_manager.execute_add_user(info_query) submit_form.as_xml(command_node)
# self.assertNotEquals(result, None) result = self.command_manager.apply_command_action(\
# self.assertEquals(len(result), 1) info_query,
"http://jabber.org/protocol/admin#announce",
# def test_execute_announce(self): "execute")
# #TODO : implement command self.assertNotEquals(result, None)
# info_query = Iq(stanza_type="set", self.assertEquals(len(result), 3)
# from_jid="user1@test.com", xml_command = result[0].xpath_eval("c:command",
# to_jid="jcl.test.com") {"c": "http://jabber.org/protocol/commands"})[0]
# result = self.command_manager.execute_add_user(info_query) self.assertEquals(xml_command.prop("status"), "completed")
# self.assertNotEquals(result, None) self.assertEquals(xml_command.prop("sessionid"), session_id)
# self.assertEquals(len(result), 1) self.__check_actions(result[0])
context_session = self.command_manager.sessions[session_id][1]
self.assertEquals(context_session["announcement"],
["test announce"])
self.assertEquals(result[1].get_from(), "jcl.test.com")
self.assertEquals(result[1].get_to(), "test1@test.com")
self.assertEquals(result[1].get_body(), "test announce")
self.assertEquals(result[2].get_from(), "jcl.test.com")
self.assertEquals(result[2].get_to(), "test2@test.com")
self.assertEquals(result[2].get_body(), "test announce")
# def test_execute_set_motd(self): # def test_execute_set_motd(self):
# #TODO : implement command # #TODO : implement command

View File

@@ -193,6 +193,18 @@ def get_all_accounts_count(account_class=Account, filter=None):
model.db_disconnect() model.db_disconnect()
return accounts_count return accounts_count
def get_all_user_jids(account_class=Account, limit=None, filter=None):
model.db_connect()
accounts = account_class.select(clause=filter, limit=limit,
orderBy=["user_jid"])
current_account = None
for _account in accounts:
if current_account is None \
or current_account.user_jid != _account.user_jid:
current_account = _account
yield _account
model.db_disconnect()
class PresenceAccount(Account): class PresenceAccount(Account):
DO_NOTHING = 0 DO_NOTHING = 0
DO_SOMETHING = 1 DO_SOMETHING = 1