get-email ad-hoc command basic behavior
darcs-hash:20080306073419-86b55-b04b36cd21ba4b85897295aef07362a69402a84e.gz
This commit is contained in:
@@ -24,11 +24,13 @@ import logging
|
||||
import re
|
||||
|
||||
from pyxmpp.jabber.dataforms import Form
|
||||
|
||||
import jcl.model.account as account
|
||||
import jcl.jabber.command as command
|
||||
from jcl.jabber.command import JCLCommandManager
|
||||
|
||||
from jmc.model.account import MailAccount
|
||||
from jcl.jabber.command import JCLCommandManager
|
||||
import jcl.jabber.command as command
|
||||
from jmc.jabber.feeder import MailSender
|
||||
|
||||
class MailCommandManager(JCLCommandManager):
|
||||
"""
|
||||
@@ -46,6 +48,7 @@ class MailCommandManager(JCLCommandManager):
|
||||
self.__logger = logging.getLogger("jmc.jabber.command.JMCCommandManager")
|
||||
#self.commands["jmc#retrieve-attachment"] = (False, command.account_node_re)
|
||||
self.commands["jmc#force-check"] = (False, re.compile(".*"))
|
||||
self.commands["jmc#get-email"] = (False, command.account_node_re)
|
||||
|
||||
# Delayed to JMC 0.3.1
|
||||
def execute_retrieve_attachment_1(self, info_query, session_context,
|
||||
@@ -84,10 +87,11 @@ class MailCommandManager(JCLCommandManager):
|
||||
self.__logger.debug("Executing command 'force-check' step 1 on root node")
|
||||
self.add_actions(command_node, [command.ACTION_COMPLETE])
|
||||
session_context["user_jids"] = [unicode(info_query.get_from().bare())]
|
||||
return (self.add_form_select_accounts(session_context, command_node,
|
||||
lang_class, "TODO:TITLE",
|
||||
"TODO:DESC", format_as_xml=True,
|
||||
show_user_jid=False), [])
|
||||
return (self.add_form_select_accounts(\
|
||||
session_context, command_node, lang_class,
|
||||
lang_class.command_force_check,
|
||||
lang_class.command_force_check_1_description,
|
||||
format_as_xml=True, show_user_jid=False), [])
|
||||
|
||||
def execute_force_check_1(self, info_query, session_context,
|
||||
command_node, lang_class):
|
||||
@@ -117,3 +121,56 @@ class MailCommandManager(JCLCommandManager):
|
||||
return (None, [])
|
||||
|
||||
execute_force_check_2 = execute_force_check_1
|
||||
|
||||
def execute_get_email_1(self, info_query, session_context,
|
||||
command_node, lang_class):
|
||||
self.__logger.debug("Executing command 'get-email' step 1")
|
||||
self.add_actions(command_node, [command.ACTION_COMPLETE])
|
||||
bare_from_jid = info_query.get_from().bare()
|
||||
account_name = info_query.get_to().node
|
||||
_account = account.get_account(bare_from_jid, account_name)
|
||||
if _account is not None:
|
||||
result_form = Form(\
|
||||
xmlnode_or_type="form",
|
||||
title=lang_class.command_get_email,
|
||||
instructions=lang_class.command_get_email_1_description)
|
||||
email_list = _account.get_mail_list_summary()
|
||||
field = result_form.add_field(name="emails",
|
||||
field_type="list-multi",
|
||||
label=lang_class.field_email_subject)
|
||||
for (email_index, email_subject) in email_list:
|
||||
field.add_option(label=email_subject, values=[email_index])
|
||||
result_form.add_field(name="fetch_more",
|
||||
field_type="boolean",
|
||||
label=lang_class.field_select_more_emails)
|
||||
result_form.as_xml(command_node)
|
||||
return (result_form, [])
|
||||
else:
|
||||
# TODO Error
|
||||
return (None, [])
|
||||
|
||||
def execute_get_email_2(self, info_query, session_context,
|
||||
command_node, lang_class):
|
||||
self.__logger.debug("Executing command 'get-email' step 2")
|
||||
result = []
|
||||
mail_sender = MailSender(self.component)
|
||||
bare_from_jid = info_query.get_from().bare()
|
||||
account_name = info_query.get_to().node
|
||||
_account = account.get_account(bare_from_jid, account_name)
|
||||
if _account is not None:
|
||||
for email_index in session_context["emails"]:
|
||||
(email_body, email_from) = _account.get_mail(email_index)
|
||||
result.append(\
|
||||
mail_sender.create_full_email_message(\
|
||||
email_from,
|
||||
lang_class.mail_subject % (email_from),
|
||||
email_body,
|
||||
_account))
|
||||
result_form = Form(\
|
||||
xmlnode_or_type="form",
|
||||
title=lang_class.command_get_email,
|
||||
instructions=lang_class.command_get_email_2_description \
|
||||
% (len(session_context["emails"])))
|
||||
result_form.as_xml(command_node)
|
||||
command_node.setProp("status", command.STATUS_COMPLETED)
|
||||
return (None, result)
|
||||
|
||||
@@ -21,22 +21,19 @@
|
||||
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
##
|
||||
|
||||
import logging
|
||||
|
||||
from pyxmpp.jid import JID
|
||||
|
||||
from sqlobject.sqlbuilder import AND
|
||||
|
||||
import jcl.jabber as jabber
|
||||
import jcl.model as model
|
||||
from jcl.model import account
|
||||
from jcl.model.account import Account, User, PresenceAccount
|
||||
from jcl.model.account import Account, User
|
||||
from jcl.jabber.disco import \
|
||||
AccountTypeDiscoGetInfoHandler, AccountDiscoGetInfoHandler
|
||||
from jcl.jabber.feeder import FeederComponent, Feeder, MessageSender, \
|
||||
HeadlineSender, FeederHandler
|
||||
from jcl.jabber.feeder import FeederComponent
|
||||
from jcl.jabber.command import CommandRootDiscoGetInfoHandler
|
||||
from jcl.jabber.component import AccountManager
|
||||
from jmc.model.account import IMAPAccount, POP3Account, \
|
||||
SMTPAccount
|
||||
|
||||
from jmc.jabber.disco import MailRootDiscoGetInfoHandler, \
|
||||
IMAPAccountDiscoGetItemsHandler, MailAccountTypeDiscoGetInfoHandler, \
|
||||
@@ -45,10 +42,9 @@ from jmc.jabber.message import SendMailMessageHandler, \
|
||||
RootSendMailMessageHandler
|
||||
from jmc.jabber.presence import MailSubscribeHandler, \
|
||||
MailUnsubscribeHandler, MailPresenceHandler
|
||||
from jmc.model.account import MailAccount, IMAPAccount, POP3Account, \
|
||||
SMTPAccount
|
||||
from jmc.lang import Lang
|
||||
from jmc.jabber.command import MailCommandManager
|
||||
from jmc.jabber.feeder import MailFeederHandler, MailFeeder, MailSender
|
||||
|
||||
class MailAccountManager(AccountManager):
|
||||
def account_get_register(self, info_query,
|
||||
@@ -156,147 +152,3 @@ class MailComponent(FeederComponent):
|
||||
if lang_class is None:
|
||||
lang_class = self.lang.get_default_lang_class()
|
||||
self.handler.handle(None, lang_class, accounts)
|
||||
|
||||
class MailFeeder(Feeder):
|
||||
"""Email check"""
|
||||
|
||||
def __init__(self, component):
|
||||
"""MailFeeder constructor"""
|
||||
Feeder.__init__(self, component)
|
||||
self.__logger = logging.getLogger("jmc.jabber.component.MailFeeder")
|
||||
|
||||
def initialize_live_email(self, _account):
|
||||
"""For live email checking account, mark emails received while
|
||||
offline as read.
|
||||
Return a boolean to continue mail checking or not
|
||||
(if waiting for password).
|
||||
"""
|
||||
if _account.password is None:
|
||||
if not _account.waiting_password_reply:
|
||||
account_manager = self.component.account_manager
|
||||
self.component.send_stanzas(\
|
||||
account_manager.ask_password(_account,
|
||||
_account.default_lang_class))
|
||||
return False
|
||||
try:
|
||||
_account.connect()
|
||||
_account.mark_all_as_read()
|
||||
_account.disconnect()
|
||||
_account.first_check = False
|
||||
_account.error = None
|
||||
return True
|
||||
except Exception, e:
|
||||
if _account.connected:
|
||||
try:
|
||||
_account.disconnect()
|
||||
except:
|
||||
# We have done everything we could
|
||||
_account.connected = False
|
||||
self.component.send_error(_account, e)
|
||||
return False
|
||||
|
||||
def feed(self, _account):
|
||||
"""Check for new emails for given MailAccount and return a list of
|
||||
those emails or a summary.
|
||||
"""
|
||||
self.__logger.debug("MailFeeder.feed")
|
||||
result = []
|
||||
if _account.first_check and _account.live_email_only:
|
||||
continue_checking = self.initialize_live_email(_account)
|
||||
if not continue_checking:
|
||||
return result
|
||||
_account.lastcheck += 1
|
||||
if _account.lastcheck == _account.interval:
|
||||
_account.lastcheck = 0
|
||||
action = _account.action
|
||||
if action != PresenceAccount.DO_NOTHING:
|
||||
try:
|
||||
if _account.password is None:
|
||||
account_manager = self.component.account_manager
|
||||
self.component.send_stanzas(\
|
||||
account_manager.ask_password(_account,
|
||||
_account.default_lang_class))
|
||||
return result
|
||||
self.__logger.debug("Checking " + _account.name)
|
||||
self.__logger.debug("\t" + _account.login \
|
||||
+ "@" + _account.host)
|
||||
_account.connect()
|
||||
mail_list = _account.get_new_mail_list()
|
||||
default_lang_class = _account.default_lang_class
|
||||
if action == MailAccount.RETRIEVE:
|
||||
# TODO : use generator (yield)
|
||||
mail_index = _account.get_next_mail_index(mail_list)
|
||||
while mail_index is not None:
|
||||
(body, email_from) = _account.get_mail(mail_index)
|
||||
result.append((email_from,
|
||||
default_lang_class.new_mail_subject\
|
||||
% (email_from),
|
||||
body))
|
||||
mail_index = _account.get_next_mail_index(mail_list)
|
||||
elif action == MailAccount.DIGEST:
|
||||
body = ""
|
||||
new_mail_count = 0
|
||||
mail_index = _account.get_next_mail_index(mail_list)
|
||||
while mail_index is not None:
|
||||
(tmp_body, from_email) = \
|
||||
_account.get_mail_summary(mail_index)
|
||||
body += tmp_body
|
||||
body += "\n----------------------------------\n"
|
||||
mail_index = _account.get_next_mail_index(mail_list)
|
||||
new_mail_count += 1
|
||||
if body != "":
|
||||
result.append((None,
|
||||
default_lang_class.new_digest_subject\
|
||||
% (new_mail_count),
|
||||
body))
|
||||
else:
|
||||
raise Exception("Unkown action: " + str(action) \
|
||||
+ "\nPlease reconfigure account.")
|
||||
_account.disconnect()
|
||||
_account.error = None
|
||||
self.__logger.debug("\nCHECK_MAIL ends " + _account.jid)
|
||||
except Exception, e:
|
||||
if _account.connected:
|
||||
try:
|
||||
_account.disconnect()
|
||||
except:
|
||||
# We have done everything we could
|
||||
_account.connected = False
|
||||
self.component.send_error(_account, e)
|
||||
return result
|
||||
|
||||
class MailSender(HeadlineSender):
|
||||
"""Send emails messages to jabber users"""
|
||||
|
||||
def send(self, to_account, data):
|
||||
"""Call MessageSender send method"""
|
||||
MessageSender.send(self, to_account, data)
|
||||
|
||||
def create_message(self, to_account, data):
|
||||
"""Send given emails (in data) as Jabber messages"""
|
||||
email_from, subject, body = data
|
||||
if to_account.action == MailAccount.RETRIEVE:
|
||||
message = MessageSender.create_message(self, to_account,
|
||||
(subject, body))
|
||||
msg_node = message.get_node()
|
||||
addresses_node = msg_node.newChild(None, "addresses", None)
|
||||
address_ns = addresses_node.newNs("http://jabber.org/protocol/address", None)
|
||||
addresses_node.setNs(address_ns)
|
||||
replyto_address_node = addresses_node.newChild(address_ns, "address", None)
|
||||
replyto_address_node.setProp("type", "replyto")
|
||||
replyto_jid = email_from.replace('@', '%', 1) + "@" \
|
||||
+ unicode(JID(to_account.jid).domain)
|
||||
replyto_address_node.setProp("jid", replyto_jid)
|
||||
elif to_account.action == MailAccount.DIGEST:
|
||||
message = HeadlineSender.create_message(self, to_account,
|
||||
(subject, body))
|
||||
else:
|
||||
message = None
|
||||
return message
|
||||
|
||||
class MailFeederHandler(FeederHandler):
|
||||
def filter(self, stanza, lang_class):
|
||||
"""Return only email account type to check mail from
|
||||
"""
|
||||
accounts = account.get_all_accounts(account_class=MailAccount)
|
||||
return accounts
|
||||
|
||||
@@ -24,27 +24,31 @@ import unittest
|
||||
import tempfile
|
||||
from ConfigParser import ConfigParser
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
|
||||
from pyxmpp.iq import Iq
|
||||
from pyxmpp.jabber.dataforms import Form
|
||||
from pyxmpp.jabber.dataforms import Field
|
||||
|
||||
import jcl.tests
|
||||
from jcl.jabber.tests.command import JCLCommandManager_TestCase
|
||||
from jcl.tests import JCLTestCase
|
||||
from jcl.jabber.feeder import Feeder
|
||||
from jcl.model.account import User
|
||||
from jcl.model.account import User, Account, PresenceAccount
|
||||
from jcl.jabber.tests.command import JCLCommandManagerTestCase
|
||||
import jcl.jabber.command as command
|
||||
|
||||
from jmc.model.account import POP3Account, IMAPAccount, SMTPAccount, \
|
||||
MailAccount
|
||||
from jmc.jabber.component import MailComponent
|
||||
|
||||
from jmc.lang import Lang
|
||||
from jmc.jabber.tests.component import MockIMAPAccount
|
||||
from jmc.jabber.command import MailCommandManager
|
||||
|
||||
class MailCommandManager_TestCase(JCLCommandManager_TestCase):
|
||||
def setUp(self):
|
||||
JCLCommandManager_TestCase.setUp(self, tables=[POP3Account, IMAPAccount,
|
||||
SMTPAccount, MailAccount,
|
||||
MockIMAPAccount])
|
||||
class MailCommandManagerTestCase(JCLCommandManagerTestCase):
|
||||
def setUp(self, tables=[]):
|
||||
tables += [POP3Account, IMAPAccount, SMTPAccount, MailAccount,
|
||||
MockIMAPAccount, User, Account, PresenceAccount]
|
||||
JCLTestCase.setUp(self, tables=tables)
|
||||
self.config_file = tempfile.mktemp(".conf", "jmctest", jcl.tests.DB_DIR)
|
||||
self.config = ConfigParser()
|
||||
self.config.read(self.config_file)
|
||||
@@ -55,307 +59,337 @@ class MailCommandManager_TestCase(JCLCommandManager_TestCase):
|
||||
self.config,
|
||||
self.config_file)
|
||||
self.comp.set_admins(["admin@test.com"])
|
||||
self.command_manager = command.command_manager
|
||||
self.command_manager = MailCommandManager(self.comp,
|
||||
self.comp.account_manager)
|
||||
self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
|
||||
SMTPAccount, MockIMAPAccount)
|
||||
self.user1 = User(jid="test1@test.com")
|
||||
self.account11 = MockIMAPAccount(user=self.user1,
|
||||
name="account11",
|
||||
jid="account11@" + unicode(self.comp.jid))
|
||||
self.account12 = MockIMAPAccount(user=self.user1,
|
||||
name="account12",
|
||||
jid="account12@" + unicode(self.comp.jid))
|
||||
self.user2 = User(jid="test2@test.com")
|
||||
self.account21 = MockIMAPAccount(user=self.user2,
|
||||
name="account21",
|
||||
jid="account21@" + unicode(self.comp.jid))
|
||||
self.account22 = MockIMAPAccount(user=self.user2,
|
||||
name="account11",
|
||||
jid="account11@" + unicode(self.comp.jid))
|
||||
self.user3 = User(jid="test3@test.com")
|
||||
self.account31 = MockIMAPAccount(user=self.user3,
|
||||
name="account31",
|
||||
jid="account31@" + unicode(self.comp.jid))
|
||||
self.account32 = MockIMAPAccount(user=self.user3,
|
||||
name="account32",
|
||||
jid="account32@" + unicode(self.comp.jid))
|
||||
self.info_query = Iq(stanza_type="set",
|
||||
from_jid="admin@test.com",
|
||||
to_jid=self.comp.jid)
|
||||
self.command_node = self.info_query.set_new_content(command.COMMAND_NS,
|
||||
"command")
|
||||
class MockFeederHandler(Feeder):
|
||||
def __init__(self, component):
|
||||
Feeder.__init__(self, component)
|
||||
self.checked_accounts = []
|
||||
|
||||
def feed(self, _account):
|
||||
self.checked_accounts.append(_account)
|
||||
assert(_account.lastcheck == (_account.interval - 1))
|
||||
return []
|
||||
|
||||
self.comp.handler.feeder = MockFeederHandler(self.comp)
|
||||
|
||||
def tearDown(self):
|
||||
JCLCommandManager_TestCase.tearDown(self)
|
||||
JCLTestCase.tearDown(self)
|
||||
if os.path.exists(self.config_file):
|
||||
os.unlink(self.config_file)
|
||||
|
||||
# def test_execute_retrieve_attachment(self):
|
||||
# self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
|
||||
# SMTPAccount, MockIMAPAccount)
|
||||
# account1 = MockIMAPAccount(user=User(jid="test1@test.com"),
|
||||
# name="account1",
|
||||
# jid="account1@" + unicode(self.comp.jid))
|
||||
# info_query = Iq(stanza_type="set",
|
||||
# from_jid="test1@test.com",
|
||||
# to_jid="account1@" + unicode(self.comp.jid))
|
||||
# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
# command_node.setProp("node", "jmc#retrieve-attachment")
|
||||
# result = self.command_manager.apply_command_action(info_query,
|
||||
# "jmc#retrieve-attachment",
|
||||
# "execute")
|
||||
# self.assertNotEquals(result, None)
|
||||
# self.assertEquals(len(result), 1)
|
||||
# print str(result[0].xmlnode)
|
||||
# xml_command = result[0].xpath_eval("c:command",
|
||||
# {"c": "http://jabber.org/protocol/commands"})[0]
|
||||
# self.assertEquals(xml_command.prop("status"), "executing")
|
||||
# self.assertNotEquals(xml_command.prop("sessionid"), None)
|
||||
# self._check_actions(result[0], ["next"])
|
||||
# print str(result[0].xmlnode)
|
||||
# x_data = result[0].xpath_eval("c:command/data:x",
|
||||
# {"c": "http://jabber.org/protocol/commands",
|
||||
# "data": "jabber:x:data"})
|
||||
# self.assertEquals(len(x_data), 1)
|
||||
# self.assertEquals(x_data[0].prop("type"), "form")
|
||||
# options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
|
||||
# {"c": "http://jabber.org/protocol/commands",
|
||||
# "data": "jabber:x:data"})
|
||||
# self.assertEquals(len(options), 3)
|
||||
# self.assertEquals(options[0].prop("label"), "Next")
|
||||
# self.assertEquals(options[0].children.name, "value")
|
||||
# self.assertEquals(options[0].children.content, "-1")
|
||||
# self.assertEquals(options[1].prop("label"), "mail 1")
|
||||
# self.assertEquals(options[1].children.name, "value")
|
||||
# self.assertEquals(options[1].children.content, "1")
|
||||
# self.assertEquals(options[2].prop("label"), "mail 2")
|
||||
# self.assertEquals(options[2].children.name, "value")
|
||||
# self.assertEquals(options[2].children.content, "2")
|
||||
class MailCommandManagerForceCheckCommand_TestCase(MailCommandManagerTestCase):
|
||||
"""
|
||||
Test 'force-check' ad-hoc command
|
||||
"""
|
||||
|
||||
# # Delayed to JMC 0.3.1
|
||||
# return
|
||||
# # Second step: TODO
|
||||
# info_query = Iq(stanza_type="set",
|
||||
# from_jid="admin@test.com",
|
||||
# to_jid=self.comp.jid)
|
||||
# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
# command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
|
||||
# session_id = xml_command.prop("sessionid")
|
||||
# command_node.setProp("sessionid", session_id)
|
||||
# command_node.setProp("action", "next")
|
||||
# submit_form = Form(xmlnode_or_type="submit")
|
||||
# submit_form.add_field(field_type="list-single",
|
||||
# name="account_type",
|
||||
# value="Example")
|
||||
# submit_form.add_field(field_type="jid-single",
|
||||
# name="user_jid",
|
||||
# value="user2@test.com")
|
||||
# submit_form.as_xml(command_node)
|
||||
# result = self.command_manager.apply_command_action(info_query,
|
||||
# "http://jabber.org/protocol/admin#add-user",
|
||||
# "next")
|
||||
# self.assertNotEquals(result, None)
|
||||
# self.assertEquals(len(result), 1)
|
||||
# xml_command = result[0].xpath_eval("c:command",
|
||||
# {"c": "http://jabber.org/protocol/commands"})[0]
|
||||
# self.assertEquals(xml_command.prop("status"), "executing")
|
||||
# self.assertEquals(xml_command.prop("sessionid"), session_id)
|
||||
# self._check_actions(result[0], ["prev", "complete"], 1)
|
||||
# x_data = result[0].xpath_eval("c:command/data:x",
|
||||
# {"c": "http://jabber.org/protocol/commands",
|
||||
# "data": "jabber:x:data"})
|
||||
# self.assertEquals(len(x_data), 1)
|
||||
# self.assertEquals(x_data[0].prop("type"), "form")
|
||||
# fields = result[0].xpath_eval("c:command/data:x/data:field",
|
||||
# {"c": "http://jabber.org/protocol/commands",
|
||||
# "data": "jabber:x:data"})
|
||||
# self.assertEquals(len(fields), 6)
|
||||
# context_session = self.command_manager.sessions[session_id][1]
|
||||
# self.assertEquals(context_session["account_type"], ["Example"])
|
||||
# self.assertEquals(context_session["user_jid"], ["user2@test.com"])
|
||||
|
||||
# # Third step
|
||||
# info_query = Iq(stanza_type="set",
|
||||
# from_jid="admin@test.com",
|
||||
# to_jid=self.comp.jid)
|
||||
# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
# command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
|
||||
# command_node.setProp("sessionid", session_id)
|
||||
# command_node.setProp("action", "complete")
|
||||
# submit_form = Form(xmlnode_or_type="submit")
|
||||
# submit_form.add_field(field_type="text-single",
|
||||
# name="name",
|
||||
# value="account1")
|
||||
# submit_form.add_field(field_type="text-single",
|
||||
# name="login",
|
||||
# value="login1")
|
||||
# submit_form.add_field(field_type="text-private",
|
||||
# name="password",
|
||||
# value="pass1")
|
||||
# submit_form.add_field(field_type="boolean",
|
||||
# name="store_password",
|
||||
# value="1")
|
||||
# submit_form.add_field(field_type="list-single",
|
||||
# name="test_enum",
|
||||
# value="choice2")
|
||||
# submit_form.add_field(field_type="text-single",
|
||||
# name="test_int",
|
||||
# value="42")
|
||||
# submit_form.as_xml(command_node)
|
||||
|
||||
# result = self.command_manager.apply_command_action(info_query,
|
||||
# "http://jabber.org/protocol/admin#add-user",
|
||||
# "execute")
|
||||
# xml_command = result[0].xpath_eval("c:command",
|
||||
# {"c": "http://jabber.org/protocol/commands"})[0]
|
||||
# self.assertEquals(xml_command.prop("status"), "completed")
|
||||
# self.assertEquals(xml_command.prop("sessionid"), session_id)
|
||||
# self._check_actions(result[0])
|
||||
|
||||
# self.assertEquals(context_session["name"], ["account1"])
|
||||
# self.assertEquals(context_session["login"], ["login1"])
|
||||
# self.assertEquals(context_session["password"], ["pass1"])
|
||||
# self.assertEquals(context_session["store_password"], ["1"])
|
||||
# self.assertEquals(context_session["test_enum"], ["choice2"])
|
||||
# self.assertEquals(context_session["test_int"], ["42"])
|
||||
|
||||
# model.db_connect()
|
||||
# _account = account.get_account("user2@test.com",
|
||||
# "account1")
|
||||
# self.assertNotEquals(_account, None)
|
||||
# self.assertEquals(_account.user.jid, "user2@test.com")
|
||||
# self.assertEquals(_account.name, "account1")
|
||||
# self.assertEquals(_account.jid, "account1@" + unicode(self.comp.jid))
|
||||
# model.db_disconnect()
|
||||
|
||||
# stanza_sent = result
|
||||
# self.assertEquals(len(stanza_sent), 4)
|
||||
# iq_result = stanza_sent[0]
|
||||
# self.assertTrue(isinstance(iq_result, Iq))
|
||||
# self.assertEquals(iq_result.get_node().prop("type"), "result")
|
||||
# self.assertEquals(iq_result.get_from(), self.comp.jid)
|
||||
# self.assertEquals(iq_result.get_to(), "admin@test.com")
|
||||
# presence_component = stanza_sent[1]
|
||||
# self.assertTrue(isinstance(presence_component, Presence))
|
||||
# self.assertEquals(presence_component.get_from(), self.comp.jid)
|
||||
# self.assertEquals(presence_component.get_to(), "user2@test.com")
|
||||
# self.assertEquals(presence_component.get_node().prop("type"),
|
||||
# "subscribe")
|
||||
# message = stanza_sent[2]
|
||||
# self.assertTrue(isinstance(message, Message))
|
||||
# self.assertEquals(message.get_from(), self.comp.jid)
|
||||
# self.assertEquals(message.get_to(), "user2@test.com")
|
||||
# self.assertEquals(message.get_subject(),
|
||||
# _account.get_new_message_subject(Lang.en))
|
||||
# self.assertEquals(message.get_body(),
|
||||
# _account.get_new_message_body(Lang.en))
|
||||
# presence_account = stanza_sent[3]
|
||||
# self.assertTrue(isinstance(presence_account, Presence))
|
||||
# self.assertEquals(presence_account.get_from(), "account1@" + unicode(self.comp.jid))
|
||||
# self.assertEquals(presence_account.get_to(), "user2@test.com")
|
||||
# self.assertEquals(presence_account.get_node().prop("type"),
|
||||
# "subscribe")
|
||||
def setUp(self, tables=[]):
|
||||
"""
|
||||
Prepare data
|
||||
"""
|
||||
MailCommandManagerTestCase.setUp(self, tables)
|
||||
self.command_node.setProp("node", "jmc#force-check")
|
||||
|
||||
def test_execute_force_check(self):
|
||||
self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
|
||||
SMTPAccount, MockIMAPAccount)
|
||||
class MockFeederHandler(Feeder):
|
||||
def __init__(self, component):
|
||||
Feeder.__init__(self, component)
|
||||
self.checked_accounts = []
|
||||
|
||||
def feed(self, _account):
|
||||
self.checked_accounts.append(_account)
|
||||
assert(_account.lastcheck == (_account.interval - 1))
|
||||
return []
|
||||
|
||||
self.comp.handler.feeder = MockFeederHandler(self.comp)
|
||||
user1 = User(jid="test1@test.com")
|
||||
user2 = User(jid="test2@test.com")
|
||||
account11 = MockIMAPAccount(user=user1,
|
||||
name="account11",
|
||||
jid="account11@" + unicode(self.comp.jid))
|
||||
account12 = MockIMAPAccount(user=user1,
|
||||
name="account12",
|
||||
jid="account12@" + unicode(self.comp.jid))
|
||||
account21 = MockIMAPAccount(user=user2,
|
||||
name="account21",
|
||||
jid="account21@" + unicode(self.comp.jid))
|
||||
account22 = MockIMAPAccount(user=user2,
|
||||
name="account11",
|
||||
jid="account11@" + unicode(self.comp.jid))
|
||||
info_query = Iq(stanza_type="set",
|
||||
from_jid="test1@test.com",
|
||||
to_jid="account11@" + unicode(self.comp.jid))
|
||||
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
command_node.setProp("node", "jmc#force-check")
|
||||
result = self.command_manager.apply_command_action(info_query,
|
||||
self.info_query.set_from("test1@test.com")
|
||||
self.info_query.set_to("account11@" + unicode(self.comp.jid))
|
||||
result = self.command_manager.apply_command_action(\
|
||||
self.info_query,
|
||||
"jmc#force-check",
|
||||
"execute")
|
||||
self.assertNotEquals(result, None)
|
||||
self.assertEquals(len(result), 1)
|
||||
xml_command = result[0].xpath_eval("c:command",
|
||||
{"c": "http://jabber.org/protocol/commands"})[0]
|
||||
self.assertEquals(xml_command.prop("status"), "completed")
|
||||
self._check_actions(result[0])
|
||||
result_iq = result[0].xmlnode
|
||||
result_iq.setNs(None)
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<iq from='account11@" + unicode(self.comp.jid)
|
||||
+ "' to='test1@test.com' type='result'>"
|
||||
+ "<command xmlns='http://jabber.org/protocol/commands' "
|
||||
+ "status='completed'>"
|
||||
+ "</command></iq>",
|
||||
result_iq, True, test_sibling=False))
|
||||
feeder = self.comp.handler.feeder
|
||||
self.assertEquals(len(feeder.checked_accounts), 1)
|
||||
self.assertEquals(feeder.checked_accounts[0], account11)
|
||||
self.assertEquals(feeder.checked_accounts[0], self.account11)
|
||||
|
||||
def test_execute_force_check_root_node(self):
|
||||
self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
|
||||
SMTPAccount, MockIMAPAccount)
|
||||
class MockFeederHandler(Feeder):
|
||||
def __init__(self, component):
|
||||
Feeder.__init__(self, component)
|
||||
self.checked_accounts = []
|
||||
|
||||
def feed(self, _account):
|
||||
self.checked_accounts.append(_account)
|
||||
assert(_account.lastcheck == (_account.interval - 1))
|
||||
return []
|
||||
|
||||
self.comp.handler.feeder = MockFeederHandler(self.comp)
|
||||
user1 = User(jid="test1@test.com")
|
||||
user2 = User(jid="test2@test.com")
|
||||
account11 = MockIMAPAccount(user=user1,
|
||||
name="account11",
|
||||
jid="account11@" + unicode(self.comp.jid))
|
||||
account12 = MockIMAPAccount(user=user1,
|
||||
name="account12",
|
||||
jid="account12@" + unicode(self.comp.jid))
|
||||
account21 = MockIMAPAccount(user=user2,
|
||||
name="account21",
|
||||
jid="account21@" + unicode(self.comp.jid))
|
||||
account22 = MockIMAPAccount(user=user2,
|
||||
name="account11",
|
||||
jid="account11@" + unicode(self.comp.jid))
|
||||
info_query = Iq(stanza_type="set",
|
||||
from_jid="test1@test.com",
|
||||
to_jid=self.comp.jid)
|
||||
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
command_node.setProp("node", "jmc#force-check")
|
||||
result = self.command_manager.apply_command_action(info_query,
|
||||
self.info_query.set_from("test1@test.com")
|
||||
result = self.command_manager.apply_command_action(\
|
||||
self.info_query,
|
||||
"jmc#force-check",
|
||||
"execute")
|
||||
self.assertNotEquals(result, None)
|
||||
self.assertEquals(len(result), 1)
|
||||
xml_command = result[0].xpath_eval("c:command",
|
||||
{"c": "http://jabber.org/protocol/commands"})[0]
|
||||
self.assertEquals(xml_command.prop("status"), "executing")
|
||||
self.assertNotEquals(xml_command.prop("sessionid"), None)
|
||||
self._check_actions(result[0], ["complete"])
|
||||
session_id = xml_command.prop("sessionid")
|
||||
result_iq = result[0].xmlnode
|
||||
result_iq.setNs(None)
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<iq from='jmc.test.com' to='test1@test.com' type='result'>"
|
||||
+ "<command xmlns='http://jabber.org/protocol/commands'"
|
||||
+ "status='executing'>"
|
||||
+ "<actions execute='complete'><complete/></actions>"
|
||||
+ "<x xmlns='jabber:x:data' type='form'>"
|
||||
+ "<title>" + Lang.en.command_force_check + "</title>"
|
||||
+ "<instructions>" + Lang.en.command_force_check_1_description
|
||||
+ "</instructions>"
|
||||
+ "<field var='account_names' type='list-multi' label='"
|
||||
+ Lang.en.field_accounts + "'>"
|
||||
+ "<option label=\"account11 (IMAP)\">"
|
||||
+ "<value>account11/test1@test.com</value></option>"
|
||||
+ "<option label=\"account12 (IMAP)\">"
|
||||
+ "<value>account12/test1@test.com</value></option>"
|
||||
+ "<option label=\"account11 (MockIMAP)\">"
|
||||
+ "<value>account11/test1@test.com</value></option>"
|
||||
+ "<option label=\"account12 (MockIMAP)\">"
|
||||
+ "<value>account12/test1@test.com</value></option>"
|
||||
+ "</field></x></command></iq>",
|
||||
result_iq, True))
|
||||
session_id = result_iq.children.prop("sessionid")
|
||||
self.assertNotEquals(session_id, None)
|
||||
context_session = self.command_manager.sessions[session_id][1]
|
||||
self.assertEquals(context_session["user_jids"],
|
||||
["test1@test.com"])
|
||||
|
||||
# Second step
|
||||
info_query = Iq(stanza_type="set",
|
||||
from_jid="admin@test.com",
|
||||
to_jid=self.comp.jid)
|
||||
command_node = info_query.set_new_content(command.COMMAND_NS, "command")
|
||||
command_node.setProp("node", "jmc#force-check")
|
||||
command_node.setProp("sessionid", session_id)
|
||||
command_node.setProp("action", "complete")
|
||||
submit_form = Form(xmlnode_or_type="submit")
|
||||
submit_form.add_field(field_type="list-multi",
|
||||
info_query = self.prepare_submit(\
|
||||
node="jmc#force-check",
|
||||
session_id=session_id,
|
||||
from_jid="test1@test.com",
|
||||
to_jid=unicode(self.comp.jid),
|
||||
fields=[Field(field_type="list-multi",
|
||||
name="account_names",
|
||||
values=["account11/test1@test.com",
|
||||
"account12/test1@test.com"])
|
||||
submit_form.as_xml(command_node)
|
||||
result = self.command_manager.apply_command_action(info_query,
|
||||
"account12/test1@test.com"])],
|
||||
action="complete")
|
||||
result = self.command_manager.apply_command_action(\
|
||||
info_query,
|
||||
"jmc#force-check",
|
||||
"execute")
|
||||
xml_command = result[0].xpath_eval("c:command",
|
||||
{"c": "http://jabber.org/protocol/commands"})[0]
|
||||
self.assertEquals(xml_command.prop("status"), "completed")
|
||||
self.assertEquals(xml_command.prop("sessionid"), session_id)
|
||||
self._check_actions(result[0])
|
||||
result_iq = result[0].xmlnode
|
||||
result_iq.setNs(None)
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<iq from='" + unicode(self.comp.jid)
|
||||
+ "' to='test1@test.com' type='result'>"
|
||||
+ "<command xmlns='http://jabber.org/protocol/commands' "
|
||||
+ "status='completed'>"
|
||||
+ "</command></iq>",
|
||||
result_iq, True, test_sibling=False))
|
||||
self.assertEquals(context_session["account_names"],
|
||||
["account11/test1@test.com",
|
||||
"account12/test1@test.com"])
|
||||
feeder = self.comp.handler.feeder
|
||||
self.assertEquals(len(feeder.checked_accounts), 2)
|
||||
self.assertEquals(feeder.checked_accounts[0], account11)
|
||||
self.assertEquals(feeder.checked_accounts[1], account12)
|
||||
self.assertEquals(feeder.checked_accounts[0], self.account11)
|
||||
self.assertEquals(feeder.checked_accounts[1], self.account12)
|
||||
|
||||
class MailCommandManagerGetEmailCommand_TestCase(MailCommandManagerTestCase):
|
||||
"""
|
||||
Test 'get-email' ad-hoc command
|
||||
"""
|
||||
|
||||
def setUp(self, tables=[]):
|
||||
"""
|
||||
Prepare data
|
||||
"""
|
||||
MailCommandManagerTestCase.setUp(self, tables)
|
||||
self.command_node.setProp("node", "jmc#get-email")
|
||||
def get_email(email_index):
|
||||
"""
|
||||
Mock method for IMAPAccount.get_email
|
||||
"""
|
||||
return ("mail body " + str(email_index),
|
||||
"from" + str(email_index) + "@test.com")
|
||||
self.account11.__dict__["get_mail"] = get_email
|
||||
|
||||
def check_step_1 (self, result):
|
||||
"""
|
||||
Check first step result of get-email ad-hoc command
|
||||
"""
|
||||
result_iq = result[0].xmlnode
|
||||
result_iq.setNs(None)
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<iq from='account11@" + unicode(self.comp.jid)
|
||||
+ "' to='test1@test.com' type='result'>"
|
||||
+ "<command xmlns='http://jabber.org/protocol/commands'"
|
||||
+ "status='executing'>"
|
||||
+ "<actions execute='complete'><complete/></actions>"
|
||||
+ "<x xmlns='jabber:x:data' type='form'>"
|
||||
+ "<title>" + Lang.en.command_get_email + "</title>"
|
||||
+ "<instructions>" + Lang.en.command_get_email_1_description
|
||||
+ "</instructions>"
|
||||
+ "<field var='emails' type='list-multi' label='"
|
||||
+ Lang.en.field_email_subject + "'>"
|
||||
+ "<option label=\"mail 1\">"
|
||||
+ "<value>1</value></option>"
|
||||
+ "<option label=\"mail 2\">"
|
||||
+ "<value>2</value></option>"
|
||||
+ "</field><field var='fetch_more' type='boolean' label='"
|
||||
+ Lang.en.field_select_more_emails + "'>"
|
||||
+ "</field></x></command></iq>",
|
||||
result_iq, True))
|
||||
session_id = result_iq.children.prop("sessionid")
|
||||
self.assertNotEquals(session_id, None)
|
||||
return session_id
|
||||
|
||||
def test_execute_get_email(self):
|
||||
"""
|
||||
Test single email retrieval
|
||||
"""
|
||||
self.info_query.set_from("test1@test.com")
|
||||
self.info_query.set_to("account11@" + unicode(self.comp.jid))
|
||||
result = self.command_manager.apply_command_action(\
|
||||
self.info_query,
|
||||
"jmc#get-email",
|
||||
"execute")
|
||||
session_id = self.check_step_1(result)
|
||||
|
||||
# Second step
|
||||
info_query = self.prepare_submit(\
|
||||
node="jmc#get-email",
|
||||
session_id=session_id,
|
||||
from_jid="test1@test.com",
|
||||
to_jid="account11@jmc.test.com",
|
||||
fields=[Field(field_type="list-multi",
|
||||
name="emails",
|
||||
values=["1"]),
|
||||
Field(field_type="boolean",
|
||||
name="fetch_more",
|
||||
value=False)],
|
||||
action="complete")
|
||||
result = self.command_manager.apply_command_action(\
|
||||
info_query,
|
||||
"jmc#get-email",
|
||||
"execute")
|
||||
self.assertEquals(len(result), 2)
|
||||
result_iq = result[0].xmlnode
|
||||
result_iq.setNs(None)
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<iq from='account11@" + unicode(self.comp.jid)
|
||||
+ "' to='test1@test.com' type='result'>"
|
||||
+ "<command xmlns='http://jabber.org/protocol/commands' "
|
||||
+ "status='completed'>"
|
||||
+ "<x xmlns='jabber:x:data' type='form'>"
|
||||
+ "<title>" + Lang.en.command_get_email + "</title>"
|
||||
+ "<instructions>" + Lang.en.command_get_email_2_description
|
||||
% (1) + "</instructions>"
|
||||
+ "</x></command></iq>",
|
||||
result_iq, True, test_sibling=False))
|
||||
result_iq = result[1].xmlnode
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<message from='account11@" + unicode(self.comp.jid)
|
||||
+ "' to='test1@test.com' "
|
||||
+ "xmlns='http://pyxmpp.jabberstudio.org/xmlns/common'>"
|
||||
+ "<subject>" + Lang.en.mail_subject % ("from1@test.com")
|
||||
+ "</subject>"
|
||||
+ "<body>mail body 1</body>"
|
||||
+ "<addresses xmlns='http://jabber.org/protocol/address'>"
|
||||
+ "<address type='replyto' jid='from1%test.com@jmc.test.com'/>"
|
||||
+ "</addresses>"
|
||||
+ "</message>",
|
||||
result_iq, True, test_sibling=False))
|
||||
|
||||
def test_execute_get_emails(self):
|
||||
"""
|
||||
Test multiple emails retrieval
|
||||
"""
|
||||
self.info_query.set_from("test1@test.com")
|
||||
self.info_query.set_to("account11@" + unicode(self.comp.jid))
|
||||
result = self.command_manager.apply_command_action(\
|
||||
self.info_query,
|
||||
"jmc#get-email",
|
||||
"execute")
|
||||
session_id = self.check_step_1(result)
|
||||
|
||||
# Second step
|
||||
info_query = self.prepare_submit(\
|
||||
node="jmc#get-email",
|
||||
session_id=session_id,
|
||||
from_jid="test1@test.com",
|
||||
to_jid="account11@jmc.test.com",
|
||||
fields=[Field(field_type="list-multi",
|
||||
name="emails",
|
||||
values=["1", "2"]),
|
||||
Field(field_type="boolean",
|
||||
name="fetch_more",
|
||||
value=False)],
|
||||
action="complete")
|
||||
result = self.command_manager.apply_command_action(\
|
||||
info_query,
|
||||
"jmc#get-email",
|
||||
"execute")
|
||||
self.assertEquals(len(result), 3)
|
||||
result_iq = result[0].xmlnode
|
||||
result_iq.setNs(None)
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<iq from='account11@" + unicode(self.comp.jid)
|
||||
+ "' to='test1@test.com' type='result'>"
|
||||
+ "<command xmlns='http://jabber.org/protocol/commands' "
|
||||
+ "status='completed'>"
|
||||
+ "<x xmlns='jabber:x:data' type='form'>"
|
||||
+ "<title>" + Lang.en.command_get_email + "</title>"
|
||||
+ "<instructions>" + Lang.en.command_get_email_2_description
|
||||
% (2) + "</instructions>"
|
||||
+ "</x></command></iq>",
|
||||
result_iq, True, test_sibling=False))
|
||||
result_iq = result[1].xmlnode
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<message from='account11@" + unicode(self.comp.jid)
|
||||
+ "' to='test1@test.com' "
|
||||
+ "xmlns='http://pyxmpp.jabberstudio.org/xmlns/common'>"
|
||||
+ "<subject>" + Lang.en.mail_subject % ("from1@test.com")
|
||||
+ "</subject>"
|
||||
+ "<body>mail body 1</body>"
|
||||
+ "<addresses xmlns='http://jabber.org/protocol/address'>"
|
||||
+ "<address type='replyto' jid='from1%test.com@jmc.test.com'/>"
|
||||
+ "</addresses>"
|
||||
+ "</message>",
|
||||
result_iq, True, test_sibling=False))
|
||||
result_iq = result[2].xmlnode
|
||||
self.assertTrue(jcl.tests.is_xml_equal(\
|
||||
u"<message from='account11@" + unicode(self.comp.jid)
|
||||
+ "' to='test1@test.com' "
|
||||
+ "xmlns='http://pyxmpp.jabberstudio.org/xmlns/common'>"
|
||||
+ "<subject>" + Lang.en.mail_subject % ("from2@test.com")
|
||||
+ "</subject>"
|
||||
+ "<body>mail body 2</body>"
|
||||
+ "<addresses xmlns='http://jabber.org/protocol/address'>"
|
||||
+ "<address type='replyto' jid='from2%test.com@jmc.test.com'/>"
|
||||
+ "</addresses>"
|
||||
+ "</message>",
|
||||
result_iq, True, test_sibling=False))
|
||||
|
||||
def suite():
|
||||
test_suite = unittest.TestSuite()
|
||||
test_suite.addTest(unittest.makeSuite(MailCommandManager_TestCase, 'test'))
|
||||
test_suite.addTest(unittest.makeSuite(MailCommandManagerForceCheckCommand_TestCase, 'test'))
|
||||
test_suite.addTest(unittest.makeSuite(MailCommandManagerGetEmailCommand_TestCase, 'test'))
|
||||
return test_suite
|
||||
|
||||
if __name__ == '__main__':
|
||||
if '-v' in sys.argv:
|
||||
logger = logging.getLogger()
|
||||
logger.addHandler(logging.StreamHandler())
|
||||
logger.setLevel(logging.INFO)
|
||||
unittest.main(defaultTest='suite')
|
||||
|
||||
@@ -155,6 +155,11 @@ class MockIMAPAccount(MockMailAccount, IMAPAccount):
|
||||
return [("1", "mail 1"),
|
||||
("2", "mail 2")]
|
||||
|
||||
def get_mail_list_summary(self):
|
||||
return [("1", "mail 1"),
|
||||
("2", "mail 2")]
|
||||
|
||||
|
||||
class MockPOP3Account(MockMailAccount, POP3Account):
|
||||
def _init(self, *args, **kw):
|
||||
POP3Account._init(self, *args, **kw)
|
||||
|
||||
@@ -324,7 +324,7 @@ class MailAccount(PresenceAccount):
|
||||
def disconnect(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def get_mail_list_summary(self, start_index=0, end_index=20):
|
||||
def get_mail_list_summary(self, start_index=1, end_index=20):
|
||||
raise NotImplementedError
|
||||
|
||||
def get_new_mail_list(self):
|
||||
@@ -411,6 +411,10 @@ class IMAPAccount(MailAccount):
|
||||
self.connected = False
|
||||
|
||||
def get_mail_list_summary(self, start_index=1, end_index=20):
|
||||
"""
|
||||
Get a list of emails starting from start_index and ending at end_index
|
||||
of tuple (email_index, email_subject)
|
||||
"""
|
||||
self.__logger.debug("Getting mail list summary")
|
||||
typ, count = self.connection.select(self._get_real_mailbox(), True)
|
||||
result = []
|
||||
@@ -428,6 +432,9 @@ class IMAPAccount(MailAccount):
|
||||
return result
|
||||
|
||||
def get_new_mail_list(self):
|
||||
"""
|
||||
Get a list of new emails indexes
|
||||
"""
|
||||
self.__logger.debug("Getting mail list")
|
||||
typ, data = self.connection.select(self._get_real_mailbox(), True)
|
||||
typ, data = self.connection.search(None, 'RECENT')
|
||||
@@ -574,11 +581,13 @@ class POP3Account(MailAccount):
|
||||
self.connection.quit()
|
||||
self.connected = False
|
||||
|
||||
def get_mail_list_summary(self, start_index=0, end_index=20):
|
||||
def get_mail_list_summary(self, start_index=1, end_index=20):
|
||||
self.__logger.debug("Getting mail list")
|
||||
count, size = self.connection.stat()
|
||||
result = []
|
||||
for index in xrange(1, count + 1):
|
||||
if count < end_index:
|
||||
end_index = count
|
||||
for index in xrange(start_index, end_index + 1):
|
||||
(ret, data, octets) = self.connection.top(index, 0)
|
||||
if ret[0:3] == '+OK':
|
||||
subject_header = self.get_decoded_header(email.message_from_string('\n'.join(data))["Subject"])[0]
|
||||
|
||||
@@ -189,6 +189,42 @@ class POP3Account_TestCase(InheritableAccount_TestCase):
|
||||
[("1", "mail subject 1"),
|
||||
("2", "mail subject 2")]))
|
||||
|
||||
test_get_mail_list_summary_start_index = \
|
||||
make_test(["+OK 3 30\r\n",
|
||||
"+OK 10 octets\r\n" + \
|
||||
"From: user@test.com\r\n" + \
|
||||
"Subject: mail subject 2\r\n.\r\n",
|
||||
"+OK 10 octets\r\n" + \
|
||||
"From: user@test.com\r\n" + \
|
||||
"Subject: mail subject 3\r\n.\r\n",
|
||||
"+OK\r\n"],
|
||||
["STAT\r\n",
|
||||
"TOP 2 0\r\n",
|
||||
"TOP 3 0\r\n",
|
||||
"RSET\r\n"],
|
||||
lambda self: \
|
||||
self.assertEquals(self.pop3_account.get_mail_list_summary(start_index=2),
|
||||
[("2", "mail subject 2"),
|
||||
("3", "mail subject 3")]))
|
||||
|
||||
test_get_mail_list_summary_end_index = \
|
||||
make_test(["+OK 3 30\r\n",
|
||||
"+OK 10 octets\r\n" + \
|
||||
"From: user@test.com\r\n" + \
|
||||
"Subject: mail subject 1\r\n.\r\n",
|
||||
"+OK 10 octets\r\n" + \
|
||||
"From: user@test.com\r\n" + \
|
||||
"Subject: mail subject 2\r\n.\r\n",
|
||||
"+OK\r\n"],
|
||||
["STAT\r\n",
|
||||
"TOP 1 0\r\n",
|
||||
"TOP 2 0\r\n",
|
||||
"RSET\r\n"],
|
||||
lambda self: \
|
||||
self.assertEquals(self.pop3_account.get_mail_list_summary(end_index=2),
|
||||
[("1", "mail subject 1"),
|
||||
("2", "mail subject 2")]))
|
||||
|
||||
test_get_new_mail_list = \
|
||||
make_test(["+OK 2 20\r\n"],
|
||||
["STAT\r\n"],
|
||||
@@ -324,6 +360,46 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
|
||||
('2', 'mail subject 2')]))
|
||||
test_func()
|
||||
|
||||
def test_get_mail_list_summary_start_index(self):
|
||||
test_func = self.make_test(\
|
||||
[lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
|
||||
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
|
||||
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
|
||||
data.split()[0] + \
|
||||
" OK [READ-WRITE] SELECT completed\r\n",
|
||||
lambda data: "* 2 FETCH ((RFC822.header) {38}\r\n" + \
|
||||
"Subject: mail subject 2\r\n\r\nbody text\r\n)\r\n" + \
|
||||
"* 3 FETCH ((RFC822.header) {38}\r\n" + \
|
||||
"Subject: mail subject 3\r\n\r\nbody text\r\n)\r\n" + \
|
||||
data.split()[0] + " OK FETCH completed\r\n"],
|
||||
["^[^ ]* EXAMINE INBOX",
|
||||
"^[^ ]* FETCH 2:20 RFC822.header"],
|
||||
lambda self: \
|
||||
self.assertEquals(self.imap_account.get_mail_list_summary(start_index=2),
|
||||
[('2', 'mail subject 2'),
|
||||
('3', 'mail subject 3')]))
|
||||
test_func()
|
||||
|
||||
def test_get_mail_list_summary_end_index(self):
|
||||
test_func = self.make_test(\
|
||||
[lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
|
||||
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
|
||||
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
|
||||
data.split()[0] + \
|
||||
" OK [READ-WRITE] SELECT completed\r\n",
|
||||
lambda data: "* 1 FETCH ((RFC822.header) {38}\r\n" + \
|
||||
"Subject: mail subject 1\r\n\r\nbody text\r\n)\r\n" + \
|
||||
"* 2 FETCH ((RFC822.header) {38}\r\n" + \
|
||||
"Subject: mail subject 2\r\n\r\nbody text\r\n)\r\n" + \
|
||||
data.split()[0] + " OK FETCH completed\r\n"],
|
||||
["^[^ ]* EXAMINE INBOX",
|
||||
"^[^ ]* FETCH 1:2 RFC822.header"],
|
||||
lambda self: \
|
||||
self.assertEquals(self.imap_account.get_mail_list_summary(end_index=2),
|
||||
[('1', 'mail subject 1'),
|
||||
('2', 'mail subject 2')]))
|
||||
test_func()
|
||||
|
||||
def test_get_new_mail_list(self):
|
||||
test_func = self.make_test(\
|
||||
[lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" + \
|
||||
|
||||
Reference in New Issue
Block a user