Force check emails ad-hoc command implementation and add forgotten command.py file
darcs-hash:20071030172731-86b55-97be8bfd051a2d489b9e24d47c8ddb67ad953b5c.gz
This commit is contained in:
102
src/jmc/jabber/command.py
Normal file
102
src/jmc/jabber/command.py
Normal file
@@ -0,0 +1,102 @@
|
||||
##
|
||||
## command.py
|
||||
## Login : David Rousselie <dax@happycoders.org>
|
||||
## Started on Tue Oct 23 18:45:19 2007 David Rousselie
|
||||
## $Id$
|
||||
##
|
||||
## Copyright (C) 2007 David Rousselie
|
||||
## This program is free software; you can redistribute it and/or modify
|
||||
## it under the terms of the GNU General Public License as published by
|
||||
## the Free Software Foundation; either version 2 of the License, or
|
||||
## (at your option) any later version.
|
||||
##
|
||||
## This program is distributed in the hope that it will be useful,
|
||||
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
## GNU General Public License for more details.
|
||||
##
|
||||
## You should have received a copy of the GNU General Public License
|
||||
## along with this program; if not, write to the Free Software
|
||||
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
##
|
||||
|
||||
import logging
|
||||
|
||||
from pyxmpp.jabber.dataforms import Form
|
||||
import jcl.model.account as account
|
||||
|
||||
from jmc.model.account import MailAccount
|
||||
from jcl.jabber.command import JCLCommandManager
|
||||
import jcl.jabber.command as command
|
||||
|
||||
class MailCommandManager(JCLCommandManager):
|
||||
"""
|
||||
Implement ad-hoc commands specific to JMC:
|
||||
- retrieve email attachments
|
||||
- force email checking
|
||||
"""
|
||||
|
||||
def __init__(self, component=None, account_manager=None):
|
||||
"""
|
||||
JMCCommandManager constructor.
|
||||
Register JMC ad-hoc commands
|
||||
"""
|
||||
JCLCommandManager.__init__(self, component, account_manager)
|
||||
self.__logger = logging.getLogger("jmc.jabber.command.JMCCommandManager")
|
||||
#self.commands["jmc#retrieve-attachment"] = (False, command.account_node_re)
|
||||
self.commands["jmc#force-check"] = (False, command.account_node_re)
|
||||
|
||||
# Delayed for JMC 0.3.1
|
||||
def execute_retrieve_attachment_1(self, info_query, session_context,
|
||||
command_node, lang_class):
|
||||
# TODO : translate
|
||||
self.__logger.debug("Executing command 'retrieve-attachment' step 1")
|
||||
self.add_actions(command_node, [command.ACTION_NEXT])
|
||||
bare_from_jid = info_query.get_from().bare()
|
||||
account_name = info_query.get_to().node
|
||||
print str(bare_from_jid)
|
||||
print str(account_name)
|
||||
_account = account.get_account(bare_from_jid, account_name,
|
||||
MailAccount)
|
||||
if _account is not None:
|
||||
result_form = Form(xmlnode_or_type="form",
|
||||
title="TODO:TITLE",
|
||||
instructions="TODO:INS")
|
||||
field = result_form.add_field(name="attachments",
|
||||
field_type="list-multi",
|
||||
label="select attachments")
|
||||
field.add_option(label="Next",
|
||||
values=["-1"])
|
||||
for (mail_id, mail_title) in _account.get_mail_with_attachment_list():
|
||||
field.add_option(label=mail_title,
|
||||
values=[mail_id])
|
||||
result_form.as_xml(command_node)
|
||||
return (result_form, [])
|
||||
else:
|
||||
# TODO Error
|
||||
return (None, [])
|
||||
|
||||
# TODO: retrieve step2: Delayed to JMC 0.3.1
|
||||
|
||||
def execute_force_check_1(self, info_query, session_context,
|
||||
command_node, lang_class):
|
||||
self.__logger.debug("Executing command 'force-check' step 1")
|
||||
self.add_actions(command_node, [command.ACTION_COMPLETE])
|
||||
session_context["user_jids"] = [unicode(info_query.get_from().bare())]
|
||||
return (self.add_form_select_accounts(session_context, command_node,
|
||||
lang_class, "TODO:TITLE",
|
||||
"TODO:DESC", format_as_xml=True,
|
||||
show_user_jid=False), [])
|
||||
|
||||
def execute_force_check_2(self, info_query, session_context,
|
||||
command_node, lang_class):
|
||||
self.__logger.debug("Executing command 'force-check' step 2")
|
||||
command_node.setProp("status", command.STATUS_COMPLETED)
|
||||
accounts = []
|
||||
for account_name in session_context["account_names"]:
|
||||
name, user_jid = self.get_name_and_jid(account_name)
|
||||
_account = account.get_account(user_jid, name)
|
||||
_account.lastcheck = _account.interval - 1
|
||||
accounts.append(_account)
|
||||
self.component.check_email_accounts(accounts, lang_class)
|
||||
return (None, [])
|
||||
@@ -146,6 +146,11 @@ class MailComponent(FeederComponent):
|
||||
AccountDiscoGetInfoHandler,
|
||||
IMAPAccountDiscoGetInfoHandler(self))
|
||||
|
||||
def check_email_accounts(self, accounts, lang_class=None):
|
||||
if lang_class is None:
|
||||
lang_class = self.lang.get_default_lang_class()
|
||||
self.handler.handle(None, lang_class, accounts)
|
||||
|
||||
class MailFeeder(Feeder):
|
||||
"""Email check"""
|
||||
|
||||
|
||||
317
src/jmc/jabber/tests/command.py
Normal file
317
src/jmc/jabber/tests/command.py
Normal file
@@ -0,0 +1,317 @@
|
||||
##
|
||||
## command.py
|
||||
## Login : David Rousselie <dax@happycoders.org>
|
||||
## Started on Tue Oct 23 18:53:28 2007 David Rousselie
|
||||
## $Id$
|
||||
##
|
||||
## Copyright (C) 2007 David Rousselie
|
||||
## This program is free software; you can redistribute it and/or modify
|
||||
## it under the terms of the GNU General Public License as published by
|
||||
## the Free Software Foundation; either version 2 of the License, or
|
||||
## (at your option) any later version.
|
||||
##
|
||||
## This program is distributed in the hope that it will be useful,
|
||||
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
## GNU General Public License for more details.
|
||||
##
|
||||
## You should have received a copy of the GNU General Public License
|
||||
## along with this program; if not, write to the Free Software
|
||||
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
##
|
||||
|
||||
import unittest
|
||||
import tempfile
|
||||
from ConfigParser import ConfigParser
|
||||
import os
|
||||
|
||||
from pyxmpp.iq import Iq
|
||||
from pyxmpp.jabber.dataforms import Form
|
||||
|
||||
import jcl.tests
|
||||
from jcl.jabber.tests.command import JCLCommandManager_TestCase
|
||||
from jcl.jabber.feeder import Feeder
|
||||
from jcl.model.account import Account, PresenceAccount, LegacyJID, \
|
||||
User
|
||||
import jcl.jabber.command as command
|
||||
|
||||
from jmc.model.account import POP3Account, IMAPAccount, SMTPAccount, \
|
||||
MailAccount
|
||||
from jmc.jabber.component import MailComponent
|
||||
from jmc.jabber.command import MailCommandManager
|
||||
|
||||
from jmc.jabber.tests.component import MockIMAPAccount
|
||||
|
||||
class MailCommandManager_TestCase(JCLCommandManager_TestCase):
|
||||
def setUp(self):
|
||||
JCLCommandManager_TestCase.setUp(self, tables=[POP3Account, IMAPAccount,
|
||||
SMTPAccount, MailAccount,
|
||||
MockIMAPAccount])
|
||||
self.config_file = tempfile.mktemp(".conf", "jmctest", jcl.tests.DB_DIR)
|
||||
self.config = ConfigParser()
|
||||
self.config.read(self.config_file)
|
||||
self.comp = MailComponent("jmc.test.com",
|
||||
"password",
|
||||
"localhost",
|
||||
"5347",
|
||||
self.config,
|
||||
self.config_file)
|
||||
self.comp.set_admins(["admin@test.com"])
|
||||
self.command_manager = command.command_manager
|
||||
|
||||
def tearDown(self):
|
||||
JCLCommandManager_TestCase.tearDown(self)
|
||||
if os.path.exists(self.config_file):
|
||||
os.unlink(self.config_file)
|
||||
|
||||
# def test_execute_retrieve_attachment(self):
|
||||
# self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
|
||||
# SMTPAccount, MockIMAPAccount)
|
||||
# account1 = MockIMAPAccount(user=User(jid="test1@test.com"),
|
||||
# name="account1",
|
||||
# jid="account1@" + unicode(self.comp.jid))
|
||||
# info_query = Iq(stanza_type="set",
|
||||
# from_jid="test1@test.com",
|
||||
# to_jid="account1@" + unicode(self.comp.jid))
|
||||
# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
# command_node.setProp("node", "jmc#retrieve-attachment")
|
||||
# result = self.command_manager.apply_command_action(info_query,
|
||||
# "jmc#retrieve-attachment",
|
||||
# "execute")
|
||||
# self.assertNotEquals(result, None)
|
||||
# self.assertEquals(len(result), 1)
|
||||
# print str(result[0].xmlnode)
|
||||
# xml_command = result[0].xpath_eval("c:command",
|
||||
# {"c": "http://jabber.org/protocol/commands"})[0]
|
||||
# self.assertEquals(xml_command.prop("status"), "executing")
|
||||
# self.assertNotEquals(xml_command.prop("sessionid"), None)
|
||||
# self._check_actions(result[0], ["next"])
|
||||
# print str(result[0].xmlnode)
|
||||
# x_data = result[0].xpath_eval("c:command/data:x",
|
||||
# {"c": "http://jabber.org/protocol/commands",
|
||||
# "data": "jabber:x:data"})
|
||||
# self.assertEquals(len(x_data), 1)
|
||||
# self.assertEquals(x_data[0].prop("type"), "form")
|
||||
# options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
|
||||
# {"c": "http://jabber.org/protocol/commands",
|
||||
# "data": "jabber:x:data"})
|
||||
# self.assertEquals(len(options), 3)
|
||||
# self.assertEquals(options[0].prop("label"), "Next")
|
||||
# self.assertEquals(options[0].children.name, "value")
|
||||
# self.assertEquals(options[0].children.content, "-1")
|
||||
# self.assertEquals(options[1].prop("label"), "mail 1")
|
||||
# self.assertEquals(options[1].children.name, "value")
|
||||
# self.assertEquals(options[1].children.content, "1")
|
||||
# self.assertEquals(options[2].prop("label"), "mail 2")
|
||||
# self.assertEquals(options[2].children.name, "value")
|
||||
# self.assertEquals(options[2].children.content, "2")
|
||||
|
||||
# # Delayed to JMC 0.3.1
|
||||
# return
|
||||
# # Second step: TODO
|
||||
# info_query = Iq(stanza_type="set",
|
||||
# from_jid="admin@test.com",
|
||||
# to_jid=self.comp.jid)
|
||||
# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
# command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
|
||||
# session_id = xml_command.prop("sessionid")
|
||||
# command_node.setProp("sessionid", session_id)
|
||||
# command_node.setProp("action", "next")
|
||||
# submit_form = Form(xmlnode_or_type="submit")
|
||||
# submit_form.add_field(field_type="list-single",
|
||||
# name="account_type",
|
||||
# value="Example")
|
||||
# submit_form.add_field(field_type="jid-single",
|
||||
# name="user_jid",
|
||||
# value="user2@test.com")
|
||||
# submit_form.as_xml(command_node)
|
||||
# result = self.command_manager.apply_command_action(info_query,
|
||||
# "http://jabber.org/protocol/admin#add-user",
|
||||
# "next")
|
||||
# self.assertNotEquals(result, None)
|
||||
# self.assertEquals(len(result), 1)
|
||||
# xml_command = result[0].xpath_eval("c:command",
|
||||
# {"c": "http://jabber.org/protocol/commands"})[0]
|
||||
# self.assertEquals(xml_command.prop("status"), "executing")
|
||||
# self.assertEquals(xml_command.prop("sessionid"), session_id)
|
||||
# self._check_actions(result[0], ["prev", "complete"], 1)
|
||||
# x_data = result[0].xpath_eval("c:command/data:x",
|
||||
# {"c": "http://jabber.org/protocol/commands",
|
||||
# "data": "jabber:x:data"})
|
||||
# self.assertEquals(len(x_data), 1)
|
||||
# self.assertEquals(x_data[0].prop("type"), "form")
|
||||
# fields = result[0].xpath_eval("c:command/data:x/data:field",
|
||||
# {"c": "http://jabber.org/protocol/commands",
|
||||
# "data": "jabber:x:data"})
|
||||
# self.assertEquals(len(fields), 6)
|
||||
# context_session = self.command_manager.sessions[session_id][1]
|
||||
# self.assertEquals(context_session["account_type"], ["Example"])
|
||||
# self.assertEquals(context_session["user_jid"], ["user2@test.com"])
|
||||
|
||||
# # Third step
|
||||
# info_query = Iq(stanza_type="set",
|
||||
# from_jid="admin@test.com",
|
||||
# to_jid=self.comp.jid)
|
||||
# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
# command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
|
||||
# command_node.setProp("sessionid", session_id)
|
||||
# command_node.setProp("action", "complete")
|
||||
# submit_form = Form(xmlnode_or_type="submit")
|
||||
# submit_form.add_field(field_type="text-single",
|
||||
# name="name",
|
||||
# value="account1")
|
||||
# submit_form.add_field(field_type="text-single",
|
||||
# name="login",
|
||||
# value="login1")
|
||||
# submit_form.add_field(field_type="text-private",
|
||||
# name="password",
|
||||
# value="pass1")
|
||||
# submit_form.add_field(field_type="boolean",
|
||||
# name="store_password",
|
||||
# value="1")
|
||||
# submit_form.add_field(field_type="list-single",
|
||||
# name="test_enum",
|
||||
# value="choice2")
|
||||
# submit_form.add_field(field_type="text-single",
|
||||
# name="test_int",
|
||||
# value="42")
|
||||
# submit_form.as_xml(command_node)
|
||||
|
||||
# result = self.command_manager.apply_command_action(info_query,
|
||||
# "http://jabber.org/protocol/admin#add-user",
|
||||
# "execute")
|
||||
# xml_command = result[0].xpath_eval("c:command",
|
||||
# {"c": "http://jabber.org/protocol/commands"})[0]
|
||||
# self.assertEquals(xml_command.prop("status"), "completed")
|
||||
# self.assertEquals(xml_command.prop("sessionid"), session_id)
|
||||
# self._check_actions(result[0])
|
||||
|
||||
# self.assertEquals(context_session["name"], ["account1"])
|
||||
# self.assertEquals(context_session["login"], ["login1"])
|
||||
# self.assertEquals(context_session["password"], ["pass1"])
|
||||
# self.assertEquals(context_session["store_password"], ["1"])
|
||||
# self.assertEquals(context_session["test_enum"], ["choice2"])
|
||||
# self.assertEquals(context_session["test_int"], ["42"])
|
||||
|
||||
# model.db_connect()
|
||||
# _account = account.get_account("user2@test.com",
|
||||
# "account1")
|
||||
# self.assertNotEquals(_account, None)
|
||||
# self.assertEquals(_account.user.jid, "user2@test.com")
|
||||
# self.assertEquals(_account.name, "account1")
|
||||
# self.assertEquals(_account.jid, "account1@" + unicode(self.comp.jid))
|
||||
# model.db_disconnect()
|
||||
|
||||
# stanza_sent = result
|
||||
# self.assertEquals(len(stanza_sent), 4)
|
||||
# iq_result = stanza_sent[0]
|
||||
# self.assertTrue(isinstance(iq_result, Iq))
|
||||
# self.assertEquals(iq_result.get_node().prop("type"), "result")
|
||||
# self.assertEquals(iq_result.get_from(), self.comp.jid)
|
||||
# self.assertEquals(iq_result.get_to(), "admin@test.com")
|
||||
# presence_component = stanza_sent[1]
|
||||
# self.assertTrue(isinstance(presence_component, Presence))
|
||||
# self.assertEquals(presence_component.get_from(), self.comp.jid)
|
||||
# self.assertEquals(presence_component.get_to(), "user2@test.com")
|
||||
# self.assertEquals(presence_component.get_node().prop("type"),
|
||||
# "subscribe")
|
||||
# message = stanza_sent[2]
|
||||
# self.assertTrue(isinstance(message, Message))
|
||||
# self.assertEquals(message.get_from(), self.comp.jid)
|
||||
# self.assertEquals(message.get_to(), "user2@test.com")
|
||||
# self.assertEquals(message.get_subject(),
|
||||
# _account.get_new_message_subject(Lang.en))
|
||||
# self.assertEquals(message.get_body(),
|
||||
# _account.get_new_message_body(Lang.en))
|
||||
# presence_account = stanza_sent[3]
|
||||
# self.assertTrue(isinstance(presence_account, Presence))
|
||||
# self.assertEquals(presence_account.get_from(), "account1@" + unicode(self.comp.jid))
|
||||
# self.assertEquals(presence_account.get_to(), "user2@test.com")
|
||||
# self.assertEquals(presence_account.get_node().prop("type"),
|
||||
# "subscribe")
|
||||
|
||||
def test_execute_force_check(self):
|
||||
self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
|
||||
SMTPAccount, MockIMAPAccount)
|
||||
class MockFeederHandler(Feeder):
|
||||
def __init__(self, component):
|
||||
Feeder.__init__(self, component)
|
||||
self.checked_accounts = []
|
||||
|
||||
def feed(self, _account):
|
||||
self.checked_accounts.append(_account)
|
||||
assert(_account.lastcheck == (_account.interval - 1))
|
||||
return []
|
||||
|
||||
self.comp.handler.feeder = MockFeederHandler(self.comp)
|
||||
user1 = User(jid="test1@test.com")
|
||||
user2 = User(jid="test2@test.com")
|
||||
account11 = MockIMAPAccount(user=user1,
|
||||
name="account11",
|
||||
jid="account11@" + unicode(self.comp.jid))
|
||||
account12 = MockIMAPAccount(user=user1,
|
||||
name="account12",
|
||||
jid="account12@" + unicode(self.comp.jid))
|
||||
account21 = MockIMAPAccount(user=user2,
|
||||
name="account21",
|
||||
jid="account21@" + unicode(self.comp.jid))
|
||||
account22 = MockIMAPAccount(user=user2,
|
||||
name="account11",
|
||||
jid="account11@" + unicode(self.comp.jid))
|
||||
info_query = Iq(stanza_type="set",
|
||||
from_jid="test1@test.com",
|
||||
to_jid=self.comp.jid)
|
||||
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
command_node.setProp("node", "jmc#force-check")
|
||||
result = self.command_manager.apply_command_action(info_query,
|
||||
"jmc#force-check",
|
||||
"execute")
|
||||
self.assertNotEquals(result, None)
|
||||
self.assertEquals(len(result), 1)
|
||||
xml_command = result[0].xpath_eval("c:command",
|
||||
{"c": "http://jabber.org/protocol/commands"})[0]
|
||||
self.assertEquals(xml_command.prop("status"), "executing")
|
||||
self.assertNotEquals(xml_command.prop("sessionid"), None)
|
||||
self._check_actions(result[0], ["complete"])
|
||||
session_id = xml_command.prop("sessionid")
|
||||
context_session = self.command_manager.sessions[session_id][1]
|
||||
self.assertEquals(context_session["user_jids"],
|
||||
["test1@test.com"])
|
||||
|
||||
# Second step
|
||||
info_query = Iq(stanza_type="set",
|
||||
from_jid="admin@test.com",
|
||||
to_jid=self.comp.jid)
|
||||
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
command_node.setProp("node", "jmc#force-check")
|
||||
command_node.setProp("sessionid", session_id)
|
||||
command_node.setProp("action", "complete")
|
||||
submit_form = Form(xmlnode_or_type="submit")
|
||||
submit_form.add_field(field_type="list-multi",
|
||||
name="account_names",
|
||||
values=["account11/test1@test.com",
|
||||
"account12/test1@test.com"])
|
||||
submit_form.as_xml(command_node)
|
||||
result = self.command_manager.apply_command_action(info_query,
|
||||
"jmc#force-check",
|
||||
"execute")
|
||||
xml_command = result[0].xpath_eval("c:command",
|
||||
{"c": "http://jabber.org/protocol/commands"})[0]
|
||||
self.assertEquals(xml_command.prop("status"), "completed")
|
||||
self.assertEquals(xml_command.prop("sessionid"), session_id)
|
||||
self._check_actions(result[0])
|
||||
self.assertEquals(context_session["account_names"],
|
||||
["account11/test1@test.com",
|
||||
"account12/test1@test.com"])
|
||||
feeder = self.comp.handler.feeder
|
||||
self.assertEquals(len(feeder.checked_accounts), 2)
|
||||
self.assertEquals(feeder.checked_accounts[0], account11)
|
||||
self.assertEquals(feeder.checked_accounts[1], account12)
|
||||
|
||||
def suite():
|
||||
test_suite = unittest.TestSuite()
|
||||
test_suite.addTest(unittest.makeSuite(MailCommandManager_TestCase, 'test'))
|
||||
return test_suite
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(defaultTest='suite')
|
||||
Reference in New Issue
Block a user