first import

first import between version 0.1.3 and 0.2

darcs-hash:20051127110300-684f5-0ed50cd0e86df9195cec2c1df070fdf24a6faeb5.gz
This commit is contained in:
dax
2005-11-27 12:03:00 +01:00
commit 9fa2df5563
27 changed files with 4351 additions and 0 deletions

0
tests/__init__.py Normal file
View File

169
tests/dummy_server.py Normal file
View File

@@ -0,0 +1,169 @@
##
## dummy_server.py
## Login : <adro8400@claralinux>
## Started on Fri May 13 12:53:17 2005 adro8400
## $Id: dummy_server.py,v 1.1 2005/07/11 20:39:31 dax Exp $
##
## Copyright (C) 2005 adro8400
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import sys
import time
import traceback
import re
import socket
import types
import select
import xml.dom.minidom
import utils
from pyxmpp import xmlextra
class DummyServer:
def __init__(self, host, port, responses = None):
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error, msg:
s = None
continue
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(sa)
s.listen(1)
except socket.error, msg:
s.close()
s = None
continue
break
self.socket = s
self.responses = None
self.queries = None
self.real_queries = []
def __del__(self):
self.socket.close()
def serve(self):
conn, addr = self.socket.accept()
rfile = conn.makefile('rb', -1)
if self.responses:
data = None
for idx in range(len(self.responses)):
# if response is a function apply it (it must return a string)
# it is given previous received data
if isinstance(self.responses[idx], types.FunctionType):
response = self.responses[idx](data)
else:
response = self.responses[idx]
if response is not None:
print >>sys.stderr, 'Sending : ', response
conn.send(response)
data = rfile.readline()
if not data:
break
else:
self.real_queries.append(data)
print >>sys.stderr, 'Receive : ', data
conn.close()
def verify_queries(self):
result = True
queries_len = len(self.queries)
if queries_len == len(self.real_queries):
for idx in range(queries_len):
real_query = self.real_queries[idx]
match = (re.compile(self.queries[idx], re.M).match(real_query) is not None)
if not match:
result = False
print >>sys.stderr, "Unexpected query :\n" + \
"Expected query : _" + self.queries[idx] + "_\n" + \
"Receive query : _" + real_query + "_\n"
else:
result = False
print >>sys.stderr, "Expected " + str(queries_len) + \
" queries, got " + str(len(self.real_queries))
return result
class XMLDummyServer(DummyServer):
def __init__(self, host, port, responses, stream_handler):
DummyServer.__init__(self, host, port, responses)
self._reader = xmlextra.StreamReader(stream_handler)
def serve(self):
try:
conn, addr = self.socket.accept()
if self.responses:
data = None
for idx in range(len(self.responses)):
try:
# TODO : this approximation is not clean
# received size is based on the expected size in self.queries
data = conn.recv(1024 + len(self.queries[idx]))
if data:
print "-----------RECEIVE " + data
r = self._reader.feed(data)
except:
type, value, stack = sys.exc_info()
print "".join (traceback.format_exception
(type, value, stack, 5))
raise
# TODO verify got all data </stream>
if data:
self.real_queries.append(data)
# if response is a function apply it (it must return a string)
# it is given previous received data
if isinstance(self.responses[idx], types.FunctionType):
response = self.responses[idx](data)
else:
response = self.responses[idx]
if response is not None:
print >>sys.stderr, '---------SENDING : ', response
conn.send(response)
conn.close()
except:
type, value, stack = sys.exc_info()
print "".join (traceback.format_exception
(type, value, stack, 5))
raise
def verify_queries(self):
result = True
queries_len = len(self.queries)
if queries_len == len(self.real_queries):
for idx in range(queries_len):
real_query = xml.dom.minidom.parseString(self.real_queries[idx])
recv_query = xml.dom.minidom.parseString(self.queries[idx])
if not utils.xmldiff(real_query, recv_query):
result = False
print >>sys.stderr, "Unexpected query :\n" + \
"Expected query : _" + self.queries[idx] + "_\n" + \
"Receive query : _" + self.real_queries[idx] + "_\n"
else:
result = False
print >>sys.stderr, "Expected " + str(queries_len) + \
" queries, got " + str(len(self.real_queries))
return result
def test():
server = DummyServer(("localhost", 4242))
server.responses = ["rep1\n", "rep2\n"]
server.serve()
if __name__ == '__main__':
test()

65
tests/email_generator.py Normal file
View File

@@ -0,0 +1,65 @@
##
# -*- coding: iso-8859-1 -*-
## email_generator.py
## Login : <adro8400@claralinux>
## Started on Tue May 17 15:33:35 2005 adro8400
## $Id: email_generator.py,v 1.1 2005/07/11 20:39:31 dax Exp $
##
## Copyright (C) 2005 adro8400
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
from email.Header import Header
from email.MIMEText import MIMEText
from email.MIMEMultipart import MIMEMultipart
def _create_multipart(encoded):
msg = MIMEMultipart()
if encoded:
part1 = MIMEText("Encoded multipart1 with 'iso-8859-15' charset (<28><><EFBFBD>)", \
_charset = "iso-8859-15")
msg.attach(part1)
part2 = MIMEText("Encoded multipart2 with 'iso-8859-15' charset (<28><><EFBFBD>)", \
_charset = "iso-8859-15")
msg.attach(part2)
else:
part1 = MIMEText("Not encoded multipart1")
msg.attach(part1)
part2 = MIMEText("Not encoded multipart2")
msg.attach(part2)
return msg
def _create_singlepart(encoded):
if encoded:
return MIMEText("Encoded single part with 'iso-8859-15' charset (<28><><EFBFBD>)", \
_charset = "iso-8859-15")
else:
return MIMEText("Not encoded single part")
def generate(encoded, multipart, header):
msg = None
if multipart:
msg = _create_multipart(encoded)
else:
msg = _create_singlepart(encoded)
if header:
if encoded:
msg['Subject'] = Header("encoded subject (<28><><EFBFBD>)", "iso-8859-15")
msg['From'] = Header("encoded from (<28><><EFBFBD>)", "iso-8859-15")
else:
msg['Subject'] = Header("not encoded subject")
msg['From'] = Header("not encoded from")
return msg

18
tests/jmc-test.xml Normal file
View File

@@ -0,0 +1,18 @@
<config>
<jabber>
<server>127.0.0.1</server>
<port>55555</port>
<secret>secret</secret>
<service>jmc.localhost</service>
<connectsleep>5</connectsleep>
<language>fr</language>
<vCard>
<FN>Jabber Mail Component</FN>
<DESC>A Jabber mail server component</DESC>
<URL>http://people.happycoders.org/dax/jabber/jmc/</URL>
</vCard>
</jabber>
<storage>DBM</storage>
<spooldir>.</spooldir>
<check_interval>1</check_interval> <!-- in minutes -->
</config>

275
tests/test_component.py Normal file
View File

@@ -0,0 +1,275 @@
##
## test_mailconnection_factory.py
## Login : <adro8400@claralinux>
## Started on Fri May 20 10:46:58 2005 adro8400
## $Id: test_component.py,v 1.2 2005/09/18 20:24:07 dax Exp $
##
## Copyright (C) 2005 adro8400
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import thread
import unittest
import dummy_server
import time
import traceback
from pyxmpp import xmlextra
from jabber.component import *
from jabber.config import Config
class TestStreamHandler(xmlextra.StreamHandler):
def __init__(self, expected_balises = []):
xmlextra.StreamHandler.__init__(self)
self.expected_balises = expected_balises
def stream_start(self, doc):
pass
def stream_end(self, doc):
pass
def stanza(self, notused, node):
pass
class MailComponent_TestCase_NoConnection(unittest.TestCase):
def setUp(self):
self.mail_component = MailComponent(Config("tests/jmc-test.xml"))
def tearDown(self):
self.mail_component = None
def test_get_reg_form(self):
reg_form = self.mail_component.get_reg_form()
reg_form2 = self.mail_component.get_reg_form()
self.assertTrue(reg_form is reg_form2)
#TODO
class MailComponent_TestCase_Basic(unittest.TestCase):
def setUp(self):
self.handler = TestStreamHandler()
self.mail_component = MailComponent(Config("tests/jmc-test.xml"))
self.server = dummy_server.XMLDummyServer("localhost", 55555, None, self.handler)
thread.start_new_thread(self.server.serve, ())
def tearDown(self):
self.server = None
self.mail_component = None
def test_run(self):
self.server.responses = ["<?xml version='1.0'?><stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:component:accept' id='4258238724' from='localhost'>", \
"<handshake/></stream:stream>"]
# TODO : concatenate all queries to parse xml
self.server.queries = ["<?xml version='1.0' encoding='UTF-8'?><stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:component:accept' to='jmc.localhost' version='1.0'>", \
"<handshake></handshake>"]
# self.server.queries = ["<\?xml version=\"1.0\" encoding=\"UTF-8\"\?>\s<stream:stream xmlns:stream=\"http://etherx.jabber.org/streams\" xmlns=\"jabber:component:accept\" to=\"jmc.localhost\" version=\"1.0\">", \
# "<handshake>\s*</handshake>"]
self.mail_component.run(1)
self.failUnless(self.server.verify_queries())
# TODO : more assertion
class MailComponent_TestCase_NoReg(unittest.TestCase):
def setUp(self):
self.handler = TestStreamHandler()
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
self.mail_component = MailComponent(Config("tests/jmc-test.xml"))
self.server = dummy_server.XMLDummyServer("localhost", 55555, None, self.handler)
thread.start_new_thread(self.server.serve, ())
def tearDown(self):
self.server = None
self.mail_component = None
def test_disco_get_items(self):
self.server.responses = ["<?xml version='1.0'?><stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:component:accept' id='4258238724' from='localhost'>", \
"<handshake/><iq type='get' to='jmc.localhost' id='aabca'><query xmlns='http://jabber.org/protocol/disco#info'/></iq>", \
"</stream:stream>"]
self.server.queries = ["<\?xml version=\"1.0\" encoding=\"UTF-8\"\?>\s?" + \
"<stream:stream xmlns:stream=\"http://etherx.jabber.org/streams\" xmlns=\"jabber:component:accept\" to=\"jmc.localhost\" version=\"1.0\">", \
"<handshake>\s*</handshake>", \
"<iq from=\"jmc.localhost\" type=\"result\" id=\"aabca\"><query xmlns=\"http://jabber.org/protocol/disco\#info\"><feature var=\"jabber:iq:version\"/><feature var=\"jabber:iq:register\"/><identity name=\"Jabber Mail Component\" category=\"headline\" type=\"mail\"/></query></iq>"]
self.mail_component.run(1)
self.failUnless(self.server.verify_queries())
def test_get_register(self):
pass
self.server.responses = ["<?xml version='1.0'?><stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:component:accept' id='4258238724' from='localhost'>", \
"<handshake/><iq type='get' to='jmc.localhost' from='test@localhost/test' id='aad9a'><query xmlns='jabber:iq:register'/></iq>", \
"</stream:stream>"]
self.server.queries = ["<\?xml version=\"1.0\" encoding=\"UTF-8\"\?>\s?" + \
"<stream:stream xmlns:stream=\"http://etherx.jabber.org/streams\" xmlns=\"jabber:component:accept\" to=\"jmc.localhost\" version=\"1.0\">", \
"<handshake>\s*</handshake>", \
"<iq from=\"jmc.localhost\" to=\"test@localhost/test\" type=\"result\" id=\"aad9a\">\s?" + \
"<query xmlns=\"jabber:iq:register\">\s?" + \
"<x xmlns=\"jabber:x:data\">\s?" + \
"<title>Jabber Mail connection registration</title>\s?" + \
"<instructions>Enter anything below</instructions>\s?" + \
"<field type=\"text-single\" label=\"Connection name\" var=\"name\"/>\s?" + \
"<field type=\"text-single\" label=\"Login\" var=\"login\"/>\s?" + \
"<field type=\"text-private\" label=\"Password\" var=\"password\"/>\s?" + \
"<field type=\"text-single\" label=\"Host\" var=\"host\"/>\s?" + \
"<field type=\"text-single\" label=\"Port\" var=\"port\"/>\s?" + \
"<field type=\"list-single\" label=\"Mailbox type\" var=\"type\">\s?" + \
"<option label=\"POP3\">\s?" + \
"<value>pop3</value>\s?" + \
"</option>\s?" + \
"<option label=\"POP3S\">\s?" + \
"<value>pop3s</value>\s?" + \
"</option>\s?" + \
"<option label=\"IMAP\">\s?" + \
"<value>imap</value>\s?" + \
"</option>\s?" + \
"<option label=\"IMAPS\">\s?" + \
"<value>imaps</value>\s?" + \
"</option>\s?" + \
"</field>\s?" + \
"<field type=\"text-single\" label=\"Mailbox (IMAP)\" var=\"mailbox\">\s?" + \
"<value>INBOX</value>\s?" + \
"</field>\s?" + \
"<field type=\"list-single\" label=\"Action when state is 'Free For Chat'\" var=\"ffc_action\">\s?" + \
"<value>2</value>\s?" + \
"<option label=\"Do nothing\">\s?" + \
"<value>0</value>\s?" + \
"</option>\s?" + \
"<option label=\"Send mail digest\">\s?" + \
"<value>1</value>\s?" + \
"</option>\s?" + \
"<option label=\"Retrieve mail\">\s?" + \
"<value>2</value>\s?" + \
"</option>\s?" + \
"</field>\s?" + \
"<field type=\"list-single\" label=\"Action when state is 'Online'\" var=\"online_action\">\s?" + \
"<value>2</value>\s?" + \
"<option label=\"Do nothing\">\s?" + \
"<value>0</value>\s?" + \
"</option>\s?" + \
"<option label=\"Send mail digest\">\s?" + \
"<value>1</value>\s?" + \
"</option>\s?" + \
"<option label=\"Retrieve mail\">\s?" + \
"<value>2</value>\s?" + \
"</option>\s?" + \
"</field>\s?" + \
"<field type=\"list-single\" label=\"Action when state is 'Away'\" var=\"away_action\">\s?" + \
"<value>1</value>\s?" + \
"<option label=\"Do nothing\">\s?" + \
"<value>0</value>\s?" + \
"</option>\s?" + \
"<option label=\"Send mail digest\">\s?" + \
"<value>1</value>\s?" + \
"</option>\s?" + \
"<option label=\"Retrieve mail\">\s?" + \
"<value>2</value>\s?" + \
"</option>\s?" + \
"</field>\s?" + \
"<field type=\"list-single\" label=\"Action when state is 'Extended Away'\" var=\"ea_action\">\s?" + \
"<value>1</value>\s?" + \
"<option label=\"Do nothing\">\s?" + \
"<value>0</value>\s?" + \
"</option>\s?" + \
"<option label=\"Send mail digest\">\s?" + \
"<value>1</value>\s?" + \
"</option>\s?" + \
"<option label=\"Retrieve mail\">\s?" + \
"<value>2</value>\s?" + \
"</option>\s?" + \
"</field>\s?" + \
"<field type=\"list-single\" label=\"Action when state is 'Offline'\" var=\"offline_action\">\s?" + \
"<value>0</value>\s?" + \
"<option label=\"Do nothing\">\s?" + \
"<value>0</value>\s?" + \
"</option>\s?" + \
"<option label=\"Send mail digest\">\s?" + \
"<value>1</value>\s?" + \
"</option>\s?" + \
"<option label=\"Retrieve mail\">\s?" + \
"<value>2</value>\s?" + \
"</option>\s?" + \
"</field>\s?" + \
"<field type=\"text-single\" label=\"Mail check interval (in minutes)\" var=\"interval\">\s?" + \
"<value>5</value>\s?" + \
"</field>\s?" + \
"</x>\s?" + \
"</query>\s?" + \
"</iq>"]
self.mail_component.run(1)
self.failUnless(self.server.verify_queries())
#self.mail_component.get_version()
# def test_disco_get_info(self):
# pass
# def test_get_register(self):
# pass
# def test_set_register(self):
# pass
class MailComponent_TestCase_Reg(unittest.TestCase):
def setUp(self):
self.mail_component = MailComponent(Config("tests/jmc-test.xml"))
self.mail_component.set_register(iq = None)
def test_get_reg_form_init(self):
pass
def test_get_reg_form_init_2pass(self):
pass
def test_disco_get_items(self):
pass
def test_get_register(self):
pass
def test_set_register_update(self):
pass
def test_set_register_remove(self):
pass
def test_presence_available_transport(self):
pass
def test_presence_available_mailbox(self):
pass
def test_presence_unavailable_transport(self):
pass
def test_presence_unavailable_mailbox(self):
pass
def test_presence_subscribe(self):
pass
def test_presence_subscribed_transport(self):
pass
def test_presence_subscribed_mailbox(self):
pass
def test_presence_unsubscribe(self):
pass
def test_presence_unsubscribed(self):
pass
def test_check_mail(self):
pass

View File

@@ -0,0 +1,264 @@
# -*- coding: iso-8859-15 -*-
##
## mailconnection_test.py
## Login : <adro8400@claralinux>
## Started on Fri May 13 11:32:51 2005 adro8400
## $Id: test_mailconnection.py,v 1.2 2005/09/18 20:24:07 dax Exp $
##
## Copyright (C) 2005 adro8400
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import unittest
from jabber.mailconnection import IMAPConnection, \
POP3Connection, \
MailConnection
import dummy_server
import email_generator
import thread
import re
import sys
import string
class MailConnection_TestCase(unittest.TestCase):
def setUp(self):
self.connection = MailConnection()
def make_test(email_type, tested_func, expected_res):
def inner(self):
encoded, multipart, header = email_type
email = email_generator.generate(encoded, \
multipart, \
header)
part = tested_func(self, email)
self.assertEquals(part, expected_res)
return inner
test_get_decoded_part_not_encoded = \
make_test((False, False, False), \
lambda self, email: self.connection.get_decoded_part(email), \
u"Not encoded single part")
test_get_decoded_part_encoded = \
make_test((True, False, False), \
lambda self, email: self.connection.get_decoded_part(email), \
u"Encoded single part with 'iso-8859-15' charset (<28><><EFBFBD>)")
test_format_message_summary_not_encoded = \
make_test((False, False, True), \
lambda self, email: self.connection.format_message_summary(email), \
u"From : not encoded from\nSubject : not encoded subject\n\n")
test_format_message_summary_encoded = \
make_test((True, False, True), \
lambda self, email: self.connection.format_message_summary(email), \
u"From : encoded from (<28><><EFBFBD>)\nSubject : encoded subject " + \
u"(<28><><EFBFBD>)\n\n")
test_format_message_summary_partial_encoded = \
make_test((True, False, True), \
lambda self, email: \
email.replace_header("Subject", \
"\"" + str(email["Subject"]) \
+ "\" not encoded part") or \
email.replace_header("From", \
"\"" + str(email["From"]) \
+ "\" not encoded part") or \
self.connection.format_message_summary(email), \
u"From : \"encoded from (<28><><EFBFBD>)\" not encoded part\nSubject " + \
u": \"encoded subject (<28><><EFBFBD>)\" not encoded part\n\n")
test_format_message_single_not_encoded = \
make_test((False, False, True), \
lambda self, email: self.connection.format_message(email), \
u"From : not encoded from\nSubject : not encoded subject" + \
u"\n\nNot encoded single part\n")
test_format_message_single_encoded = \
make_test((True, False, True), \
lambda self, email: self.connection.format_message(email), \
u"From : encoded from (<28><><EFBFBD>)\nSubject : encoded subject " + \
u"(<28><><EFBFBD>)\n\nEncoded single part with 'iso-8859-15' charset" + \
u" (\xe9\xe0\xea)\n")
test_format_message_multi_not_encoded = \
make_test((False, True, True), \
lambda self, email: self.connection.format_message(email), \
u"From : not encoded from\nSubject : not encoded subject" + \
u"\n\nNot encoded multipart1\nNot encoded multipart2\n")
test_format_message_multi_encoded = \
make_test((True, True, True), \
lambda self, email: self.connection.format_message(email), \
u"From : encoded from (<28><><EFBFBD>)\nSubject : encoded subject (<28><>" + \
u"<EFBFBD>)\n\nEncoded multipart1 with 'iso-8859-15' charset (<28><><EFBFBD>" + \
u")\nEncoded multipart2 with 'iso-8859-15' charset (<28><><EFBFBD>)\n")
class POP3Connection_TestCase(unittest.TestCase):
def setUp(self):
self.server = dummy_server.DummyServer("localhost", 1110)
thread.start_new_thread(self.server.serve, ())
self.pop3connection = POP3Connection("login", \
"pass", \
"localhost", \
1110, \
ssl = False)
def tearDown(self):
self.server = None
self.pop3connection = None
def make_test(responses = None, queries = None, core = None):
def inner(self):
self.server.responses = ["+OK connected\r\n", \
"+OK name is a valid mailbox\r\n", \
"+OK pass\r\n"]
if responses:
self.server.responses += responses
self.server.queries = ["USER login\r\n", \
"PASS pass\r\n"]
if queries:
self.server.queries += queries
self.server.queries += ["QUIT\r\n"]
self.pop3connection.connect()
self.failUnless(self.pop3connection.connection, \
"Cannot establish connection")
if core:
core(self)
self.pop3connection.disconnect()
self.failUnless(self.server.verify_queries(), \
"Sended queries does not match expected queries.")
return inner
test_connection = make_test()
test_get_mail_list = \
make_test(["+OK 2 20\r\n"], \
["STAT\r\n"], \
lambda self: \
self.assertEquals(self.pop3connection.get_mail_list(), \
["1", "2"]))
test_get_mail_summary = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n"], \
["RETR 1\r\n"], \
lambda self: \
self.assertEquals(self.pop3connection.get_mail_summary(1), \
"From : user@test.com\n" + \
"Subject : subject test\n\n"))
test_get_mail = \
make_test(["+OK 10 octets\r\n" + \
"From: user@test.com\r\n" + \
"Subject: subject test\r\n\r\n" + \
"mymessage\r\n.\r\n"], \
["RETR 1\r\n"], \
lambda self: \
self.assertEquals(self.pop3connection.get_mail(1), \
"From : user@test.com\n" + \
"Subject : subject test\n\n" + \
"mymessage\n"))
class IMAPConnection_TestCase(unittest.TestCase):
def setUp(self):
self.server = dummy_server.DummyServer("localhost", 1143)
thread.start_new_thread(self.server.serve, ())
self.imap_connection = IMAPConnection("login", \
"pass", \
"localhost", \
1143, \
ssl = False)
def tearDown(self):
self.server = None
self.imap_connection = None
def make_test(responses = None, queries = None, core = None):
def inner(self):
self.server.responses = ["* OK [CAPABILITY IMAP4 LOGIN-REFERRALS " + \
"AUTH=PLAIN]\n", \
lambda data: "* CAPABILITY IMAP4 " + \
"LOGIN-REFERRALS AUTH=PLAIN\n" + \
data.split()[0] + \
" OK CAPABILITY completed\n", \
lambda data: data.split()[0] + \
" OK LOGIN completed\n"]
if responses:
self.server.responses += responses
self.server.queries = ["^[^ ]* CAPABILITY", \
"^[^ ]* LOGIN login \"pass\""]
if queries:
self.server.queries += queries
self.server.queries += ["^[^ ]* LOGOUT"]
self.imap_connection.connect()
self.failUnless(self.imap_connection.connection, \
"Cannot establish connection")
if core:
core(self)
self.imap_connection.disconnect()
self.failUnless(self.server.verify_queries("Sended queries does not match expected queries."))
return inner
test_connection = make_test()
test_get_mail_list = make_test([lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" +\
" [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\n", \
lambda data: "* SEARCH 9 10 \n" + \
data.split()[0] + " OK SEARCH completed\n"], \
["^[^ ]* SELECT INBOX", \
"^[^ ]* SEARCH UNSEEN"], \
lambda self: \
self.assertEquals(self.imap_connection.get_mail_list(), ['9', '10']))
test_get_mail_summary = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n", \
lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n", \
lambda data: "* 1 FETCH (FLAGS (\UNSEEN))\r\n" + \
data.split()[0] + " OK STORE completed\r\n"], \
["^[^ ]* SELECT INBOX", \
"^[^ ]* FETCH 1 \(RFC822\)", \
"^[^ ]* STORE 1 FLAGS \(UNSEEN\)"], \
lambda self: self.assertEquals(self.imap_connection.get_mail_summary(1), \
"From : None\nSubject : None\n\n"))
test_get_mail = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n", \
lambda data: "* 1 FETCH ((RFC822) {11}\r\nbody" + \
" text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n", \
lambda data: "* 1 FETCH (FLAGS (\UNSEEN))\r\n" + \
data.split()[0] + " OK STORE completed\r\n"], \
["^[^ ]* SELECT INBOX", \
"^[^ ]* FETCH 1 \(RFC822\)", \
"^[^ ]* STORE 1 FLAGS \(UNSEEN\)"], \
lambda self: self.assertEquals(self.imap_connection.get_mail(1), \
"From : None\nSubject : None\n\nbody text\r\n\n"))

View File

@@ -0,0 +1,96 @@
##
## test_mailconnection_factory.py
## Login : <adro8400@claralinux>
## Started on Fri May 20 10:46:58 2005 adro8400
## $Id: test_mailconnection_factory.py,v 1.1 2005/07/11 20:39:31 dax Exp $
##
## Copyright (C) 2005 adro8400
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import unittest
from jabber.mailconnection_factory import *
class MailConnectionFactory_TestCase(unittest.TestCase):
def test_new_mail_connection_imap(self):
mc = get_new_mail_connection("imap")
self.assertEquals(mc, mc)
def test_new_mail_connection_imaps(self):
mc = get_new_mail_connection("imaps")
self.assertEquals(mc, mc)
def test_new_mail_connection_pop3(self):
mc = get_new_mail_connection("pop3")
self.assertEquals(mc, mc)
def test_new_mail_connection_pop3s(self):
mc = get_new_mail_connection("pop3s")
self.assertEquals(mc, mc)
def test_str_to_mail_connection_imap(self):
mc = str_to_mail_connection("imap#login#passwd#host#193#nothing#nothing#nothing#nothing#retrieve#INBOX")
self.assertEquals(mc.get_type(), "imap")
self.assertEquals(mc.login, "login")
self.assertEquals(mc.password, "passwd")
self.assertEquals(mc.host, "host")
self.assertEquals(mc.port, 193)
self.assertEquals(mc.mailbox, "INBOX")
self.assertEquals(mc.ffc_action, "nothing")
self.assertEquals(mc.online_action, "nothing")
self.assertEquals(mc.away_action, "nothing")
self.assertEquals(mc.ea_action, "nothing")
self.assertEquals(mc.offline_action, "retrieve")
def test_str_to_mail_connection_imaps(self):
mc = str_to_mail_connection("imaps#login#passwd#host#993#nothing#nothing#nothing#nothing#retrieve#INBOX.SubDir")
self.assertEquals(mc.get_type(), "imaps")
self.assertEquals(mc.login, "login")
self.assertEquals(mc.password, "passwd")
self.assertEquals(mc.host, "host")
self.assertEquals(mc.port, 993)
self.assertEquals(mc.mailbox, "INBOX.SubDir")
self.assertEquals(mc.ffc_action, "nothing")
self.assertEquals(mc.online_action, "nothing")
self.assertEquals(mc.away_action, "nothing")
self.assertEquals(mc.ea_action, "nothing")
self.assertEquals(mc.offline_action, "retrieve")
def test_str_to_mail_connection_pop3(self):
mc = str_to_mail_connection("pop3#login#passwd#host#110#nothing#nothing#nothing#nothing#retrieve")
self.assertEquals(mc.get_type(), "pop3")
self.assertEquals(mc.login, "login")
self.assertEquals(mc.password, "passwd")
self.assertEquals(mc.host, "host")
self.assertEquals(mc.port, 110)
self.assertEquals(mc.ffc_action, "nothing")
self.assertEquals(mc.online_action, "nothing")
self.assertEquals(mc.away_action, "nothing")
self.assertEquals(mc.ea_action, "nothing")
self.assertEquals(mc.offline_action, "retrieve")
def test_str_to_mail_connection_pop3s(self):
mc = str_to_mail_connection("pop3s#login#passwd#host#995#nothing#nothing#nothing#nothing#retrieve")
self.assertEquals(mc.get_type(), "pop3s")
self.assertEquals(mc.login, "login")
self.assertEquals(mc.password, "passwd")
self.assertEquals(mc.host, "host")
self.assertEquals(mc.port, 995)
self.assertEquals(mc.ffc_action, "nothing")
self.assertEquals(mc.online_action, "nothing")
self.assertEquals(mc.away_action, "nothing")
self.assertEquals(mc.ea_action, "nothing")
self.assertEquals(mc.offline_action, "retrieve")

150
tests/test_storage.py Normal file
View File

@@ -0,0 +1,150 @@
##
## test_storage.py
## Login : <dax@happycoders.org>
## Started on Fri May 20 10:46:58 2005 dax
## $Id: test_component.py,v 1.1 2005/07/11 20:39:31 dax Exp $
##
## Copyright (C) 2005 adro8400
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import os
import unittest
import dummy_server
from jabber.storage import *
from jabber import mailconnection
from jabber.mailconnection import *
class Storage_TestCase(unittest.TestCase):
def test_init(self):
spool_dir = "./spool/test"
self._storage = Storage(spool_dir = spool_dir)
self.assertTrue(os.access(spool_dir, os.F_OK))
self.assertEquals(self._storage.spool_dir, spool_dir)
os.removedirs(spool_dir)
# TODO
class SQLiteStorage_TestCase(unittest.TestCase):
def setUp(self):
self._storage = SQLiteStorage()
def tearDown(self):
pass
class DBMStorage_TestCase(unittest.TestCase):
def setUp(self):
self._storage = DBMStorage(nb_pk_fields = 2)
self._account1 = IMAPConnection(login = "login1",
password = "password1",
host = "host1",
port = 993,
ssl = True,
mailbox = "INBOX.box1")
self._account1.ffc_action = mailconnection.DO_NOTHING
self._account1.onlline_action = mailconnection.DO_NOTHING
self._account1.away_action = mailconnection.DO_NOTHING
self._account1.ea_action = mailconnection.DO_NOTHING
self._account1.offline_action = mailconnection.DO_NOTHING
self._account2 = IMAPConnection(login = "login2",
password = "password2",
host = "host2",
port = 1993,
ssl = False,
mailbox = "INBOX.box2")
self._account2.ffc_action = mailconnection.DO_NOTHING
self._account2.onlline_action = mailconnection.DO_NOTHING
self._account2.away_action = mailconnection.DO_NOTHING
self._account2.ea_action = mailconnection.DO_NOTHING
self._account2.offline_action = mailconnection.DO_NOTHING
def tearDown(self):
os.remove(self._storage.file)
self._storage = None
def test_set_get(self):
self._storage[("test@localhost", "account1")] = self._account1
self._storage[("test@localhost", "account2")] = self._account2
self.assertEquals(self._storage[("test@localhost", "account1")], self._account1)
self.assertEquals(self._storage[("test@localhost", "account2")], self._account2)
def test_set_sync_get(self):
self._storage[("test@localhost", "account1")] = self._account1
self._storage[("test@localhost", "account2")] = self._account2
loaded_storage = DBMStorage(nb_pk_fields = 2)
self.assertEquals(loaded_storage[("test@localhost", "account1")], self._account1)
self.assertEquals(loaded_storage[("test@localhost", "account2")], self._account2)
def test_set_del_get(self):
self._storage[("test@localhost", "account2")] = self._account2
del self._storage[("test@localhost", "account2")]
try:
self._storage[("test@localhost", "account2")]
except KeyError:
return
self.fail("KeyError was expected")
def test_haskey(self):
self._storage[("test@localhost", "account2")] = self._account2
self.assertTrue(self._storage.has_key(("test@localhost", "account2")))
def test_get_filtered(self):
self._storage[("test@localhost", "account1")] = self._account1
self._storage[("test@localhost", "account2")] = self._account2
result = self._storage[("test@localhost",)]
self.assertEquals(type(result), list)
self.assertEquals(len(result), 2)
self.assertEquals(result[0], self._account1)
self.assertEquals(result[1], self._account2)
def test_get_filtered2(self):
self._storage[("test@localhost", "account1")] = self._account1
self._storage[("test@localhost", "account2")] = self._account2
result = self._storage[("account1",)]
self.assertEquals(type(result), list)
self.assertEquals(len(result), 1)
self.assertEquals(result[0], self._account1)
def test_keys(self):
self._storage[("test@localhost", "account1")] = self._account1
self._storage[("test@localhost", "account2")] = self._account2
result = self._storage.keys()
self.assertEquals(type(result), list)
self.assertEquals(len(result), 2)
self.assertEquals(type(result[0]), tuple)
self.assertEquals(len(result[0]), 2)
self.assertEquals(result[0][0], "test@localhost")
self.assertEquals(result[0][1], "account1")
self.assertEquals(type(result[1]), tuple)
self.assertEquals(len(result[1]), 2)
self.assertEquals(result[1][0], "test@localhost")
self.assertEquals(result[1][1], "account2")
def test_keys_filtered(self):
self._storage[("test@localhost", "account1")] = self._account1
self._storage[("test@localhost", "account2")] = self._account2
result = self._storage.keys(())
self.assertEquals(type(result), list)
self.assertEquals(len(result), 1)
self.assertEquals(result[0], "test@localhost")
def test_keys_filtered2(self):
self._storage[("test@localhost", "account1")] = self._account1
self._storage[("test@localhost", "account2")] = self._account2
result = self._storage.keys(("test@localhost",))
self.assertEquals(type(result), list)
self.assertEquals(len(result), 2)
self.assertEquals(result[0], "account2")
self.assertEquals(result[1], "account1")

72
tests/test_x.py Normal file
View File

@@ -0,0 +1,72 @@
##
## test_x.py
## Login : <adro8400@claralinux>
## Started on Fri May 20 10:46:58 2005 adro8400
## $Id: test_x.py,v 1.1 2005/07/11 20:39:31 dax Exp $
##
## Copyright (C) 2005 adro8400
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import unittest
from jabber.x import *
class X_TestCase(unittest.TestCase):
def setUp(self):
self.mail_component = MailComponent()
def test_get_form(self):
self.reg_form.add_field(type = "text-single", \
label = "Connection name", \
var = "name")
self.reg_form.add_field(type = "text-single", \
label = "Login", \
var = "login")
self.reg_form.add_field(type = "text-private", \
label = "password", \
var = "password")
self.reg_form.add_field(type = "text-single", \
label = "Host", \
var = "host")
self.reg_form.add_field(type = "text-single", \
label = "Port", \
var = "port")
field = self.reg_form.add_field(type = "list-single", \
label = "Mailbox type", \
var = "type")
field.add_option(label = "POP3", \
value = "pop3")
field.add_option(label = "POP3S", \
value = "pop3s")
field.add_option(label = "IMAP", \
value = "imap")
field.add_option(label = "IMAPS", \
value = "imaps")
self.reg_form.add_field(type = "text-single", \
label = "Mailbox (IMAP)", \
var = "mailbox", \
value = "INBOX")
self.reg_form.add_field(type = "boolean", \
label = "Retrieve mail", \
var = "retrieve")
pass

28
tests/tmp.py Normal file
View File

@@ -0,0 +1,28 @@
import dummy_server
server = dummy_server.DummyServer(("localhost", 1143))
server.responses = ["* OK [CAPABILITY IMAP4 LOGIN-REFERRALS " + \
"AUTH=PLAIN]\r\n", \
lambda data: "* CAPABILITY IMAP4 " + \
"LOGIN-REFERRALS AUTH=PLAIN\r\n" + \
data.split()[0] + \
" OK CAPABILITY completed\r\n", \
lambda data: data.split()[0] + \
" OK LOGIN completed\r\n", \
lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
" OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
data.split()[0] + \
" OK [READ-WRITE] SELECT completed\r\n", \
lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody text\r\n)\r\n" + \
data.split()[0] + " OK FETCH completed\r\n", \
lambda data: "* 1 FETCH (FLAGS (\UNSEEN))\r\n" + \
data.split()[0] + " OK STORE completed\r\n"]
server.queries = ["CAPABILITY", \
"LOGIN login \"pass\"", \
"SELECT INBOX", \
"FETCH 1 (RFC822)", \
"STORE 1 FLAGS (UNSEEN)", \
"LOGOUT"]
server.serve()
#server.verify_queries()

121
tests/tmp1 Normal file
View File

@@ -0,0 +1,121 @@
<iq from="jmc.localhost" to="test@localhost/test" type="result" id="aad9a">
<query xmlns="jabber:iq:register">
<x xmlns="jabber:x:data">
<title>
Jabber Mail connection registration</title>
<instructions>
Enter anything below</instructions>
<field type="text-single" label="Connection name" var="name"/>
<field type="text-single" label="Login" var="login"/>
<field type="text-private" label="Password" var="password"/>
<field type="text-single" label="Host" var="host"/>
<field type="text-single" label="Port" var="port"/>
<field type="list-single" label="Mailbox type" var="type">
<option label="POP3">
<value>
pop3</value>
</option>
<option label="POP3S">
<value>
pop3s</value>
</option>
<option label="IMAP">
<value>
imap</value>
</option>
<option label="IMAPS">
<value>
imaps</value>
</option>
</field>
<field type="text-single" label="Mailbox (IMAP)" var="mailbox">
<value>
INBOX</value>
</field>
<field type="list-single" label="Action when state is 'Free For Chat'" var="ffc_action">
<value>
2</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="list-single" label="Action when state is 'Online'" var="online_action">
<value>
2</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="list-single" label="Action when state is 'Away'" var="away_action">
<value>
1</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="list-single" label="Action when state is 'Extended Away'" var="ea_action">
<value>
1</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="list-single" label="Action when state is 'Offline'" var="offline_action">
<value>
0</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="text-single" label="Mail check interval (in minutes)" var="interval">
<value>
5</value>
</field>
</x>
</query>
</iq>

121
tests/tmp2 Normal file
View File

@@ -0,0 +1,121 @@
<iq from="jmc.localhost" to="test@localhost/test" type="result" id="aad9a">
<query xmlns="jabber:iq:register">
<x xmlns="jabber:x:data">
<title>
Jabber Mail connection registration</title>
<instructions>
Enter anything below</instructions>
<field type="text-single" label="Connection name" var="name"/>
<field type="text-single" label="Login" var="login"/>
<field type="text-private" label="Password" var="password"/>
<field type="text-single" label="Host" var="host"/>
<field type="text-single" label="Port" var="port"/>
<field type="list-single" label="Mailbox type" var="type">
<option label="POP3">
<value>
pop3</value>
</option>
<option label="POP3S">
<value>
pop3s</value>
</option>
<option label="IMAP">
<value>
imap</value>
</option>
<option label="IMAPS">
<value>
imaps</value>
</option>
</field>
<field type="text-single" label="Mailbox (IMAP)" var="mailbox">
<value>
INBOX</value>
</field>
<field type="list-single" label="Action when state is 'Free For Chat'" var="ffc_action">
<value>
2</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="list-single" label="Action when state is 'Online'" var="online_action">
<value>
2</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="list-single" label="Action when state is 'Away'" var="away_action">
<value>
1</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="list-single" label="Action when state is 'Extended Away'" var="ea_action">
<value>
1</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="list-single" label="Action when state is 'Offline'" var="offline_action">
<value>
0</value>
<option label="Do nothing">
<value>
0</value>
</option>
<option label="Send mail digest">
<value>
1</value>
</option>
<option label="Retrieve mail">
<value>
2</value>
</option>
</field>
<field type="text-single" label="Mail check interval (in minutes)" var="interval">
<value>
5</value>
</field>
</x>
</query>
</iq>

100
tests/utils.py Normal file
View File

@@ -0,0 +1,100 @@
##
## utils.py
## Login : <dax@happycoders.org>
## Started on Mon Oct 24 21:44:43 2005 dax
## $Id$
##
## Copyright (C) 2005 dax
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import xml.dom.minidom
#document = "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:component:accept' id='4258238724' from='localhost'>"
# document = """\
# <slideshow attr='value'>
# <title>Demo slideshow</title>
# <slide><title>Slide title</title>
# <point>This is a demo</point>
# <point>Of a program for processing slides</point>
# </slide>
# <slide><title>Another demo slide</title>
# <point>It is important</point>
# <point>To have more than</point>
# <point>one slide</point>
# </slide>
# </slideshow>
# """
# document1 = """\
# <slideshow attr='value'>
# <title>Demo slideshow</title>
# <slide><title>Slide title</title>
# <point>This is a demo</point>
# <point>Of a program for processing slides</point>
# </slide>
# <slide><title>Another demo slide</title>
# <point>It is important1</point>
# <point>To have more than</point>
# <point>one slide</point>
# </slide>
# </slideshow>
# """
#dom = xml.dom.minidom.parseString(document)
#dom1 = xml.dom.minidom.parseString(document)
# def getText(nodelist):
# rc = ""
# for node in nodelist:
# if node.nodeType == node.TEXT_NODE:
# rc = rc + node.data
# return rc
def xmldiff(node1, node2):
if node1.nodeType == node1.TEXT_NODE:
if not node2.nodeType == node2.TEXT_NODE \
or node1.data != node2.data:
return False
elif node1.nodeType == node1.DOCUMENT_NODE:
if not node2.nodeType == node2.DOCUMENT_NODE:
return False
elif node1.tagName != node2.tagName:
return False
else:
for attr in node1._get_attributes().keys():
if not node2.hasAttribute(attr) \
or node1.getAttribute(attr) != node2.getAttribute(attr):
return False
for i in range(len(node1.childNodes)):
if not xmldiff(node1.childNodes[i], node2.childNodes[i]):
return False
return True
#print xmldiff(dom, dom1)
# def nodediff(node1, node2):
# if not node1.name == node2.name:
# return False
# for properties in node1.properties:
# if node2.hasAttribute(attr):
# def xmldiff(xpath, node1, node2):
# if not nodediff(node1, node2):
# return False
# for child in node1.children: