From 170f482ae1f29a3ea8735ba8689a2a57b08bc4a8 Mon Sep 17 00:00:00 2001 From: David Rousselie Date: Wed, 21 Feb 2007 18:36:04 +0100 Subject: [PATCH] Migration to JCL JMC migration to JCL: - Use SQLObject for persistence - Use PyXMPP DataForm implementation - test packages reorganisation Need to update component.py and config.py to finish the migration darcs-hash:20070221173604-86b55-17fb4a530f378b51b6b62a117a6f93c73c5be796.gz --- coverage.py | 724 +++++++++++++----- run_test.py | 86 +-- src/jmc.py | 85 +- src/jmc/__init__.py | 2 + src/jmc/{utils => }/config.py | 0 src/jmc/email/mailconnection_factory.py | 142 ---- src/jmc/jabber/__init__.py | 2 + src/jmc/jabber/x.py | 129 ---- src/jmc/{utils => }/lang.py | 0 src/jmc/model/__init__.py | 2 + .../mailconnection.py => model/account.py} | 282 +++---- src/jmc/utils/__init__.py | 0 src/jmc/utils/storage.py | 297 ------- tests/jmc-test.xml | 20 - tests/jmc/__init__.py | 2 + tests/{ => jmc}/dummy_server.py | 34 +- tests/{ => jmc}/email_generator.py | 0 tests/jmc/jabber/__init__.py | 2 + .../jmc/jabber/test_component.py | 21 +- .../jmc/email => tests/jmc/model}/__init__.py | 0 .../model/test_account.py} | 179 +++-- tests/{ => jmc}/test_lang.py | 0 tests/test_component.py | 355 --------- tests/test_mailconnection_factory.py | 204 ----- tests/test_storage.py | 224 ------ tests/test_x.py | 72 -- tests/utils.py | 95 --- 27 files changed, 901 insertions(+), 2058 deletions(-) rename src/jmc/{utils => }/config.py (100%) delete mode 100644 src/jmc/email/mailconnection_factory.py delete mode 100644 src/jmc/jabber/x.py rename src/jmc/{utils => }/lang.py (100%) create mode 100644 src/jmc/model/__init__.py rename src/jmc/{email/mailconnection.py => model/account.py} (67%) delete mode 100644 src/jmc/utils/__init__.py delete mode 100644 src/jmc/utils/storage.py delete mode 100644 tests/jmc-test.xml create mode 100644 tests/jmc/__init__.py rename tests/{ => jmc}/dummy_server.py (82%) rename tests/{ => jmc}/email_generator.py (100%) create mode 100644 tests/jmc/jabber/__init__.py rename src/jmc/utils/release.py => tests/jmc/jabber/test_component.py (62%) rename {src/jmc/email => tests/jmc/model}/__init__.py (100%) rename tests/{test_mailconnection.py => jmc/model/test_account.py} (65%) rename tests/{ => jmc}/test_lang.py (100%) delete mode 100644 tests/test_component.py delete mode 100644 tests/test_mailconnection_factory.py delete mode 100644 tests/test_storage.py delete mode 100644 tests/test_x.py delete mode 100644 tests/utils.py diff --git a/coverage.py b/coverage.py index abd95da..66e55e0 100755 --- a/coverage.py +++ b/coverage.py @@ -6,6 +6,8 @@ # COVERAGE.PY -- COVERAGE TESTING # # Gareth Rees, Ravenbrook Limited, 2001-12-04 +# Ned Batchelder, 2004-12-12 +# http://nedbatchelder.com/code/modules/coverage.html # # # 1. INTRODUCTION @@ -20,34 +22,49 @@ # interface and limitations. See [GDR 2001-12-04b] for requirements and # design. -"""Usage: +r"""Usage: -coverage.py -x MODULE.py [ARG1 ARG2 ...] +coverage.py -x [-p] MODULE.py [ARG1 ARG2 ...] Execute module, passing the given command-line arguments, collecting - coverage data. + coverage data. With the -p option, write to a temporary file containing + the machine name and process ID. coverage.py -e Erase collected coverage data. -coverage.py -r [-m] FILE1 FILE2 ... +coverage.py -c + Collect data from multiple coverage files (as created by -p option above) + and store it into a single file representing the union of the coverage. + +coverage.py -r [-m] [-o dir1,dir2,...] FILE1 FILE2 ... Report on the statement coverage for the given files. With the -m option, show line numbers of the statements that weren't executed. -coverage.py -a [-d dir] FILE1 FILE2 ... +coverage.py -a [-d dir] [-o dir1,dir2,...] FILE1 FILE2 ... Make annotated copies of the given files, marking statements that are executed with > and statements that are missed with !. With the -d option, make the copies in that directory. Without the -d option, make each copy in the same directory as the original. +-o dir,dir2,... + Omit reporting or annotating files when their filename path starts with + a directory listed in the omit list. + e.g. python coverage.py -i -r -o c:\python23,lib\enthought\traits + Coverage data is saved in the file .coverage by default. Set the COVERAGE_FILE environment variable to save it somewhere else.""" +__version__ = "2.6.20060823" # see detailed history at the end of this file. + +import compiler +import compiler.visitor import os import re import string import sys +import threading import types - +from socket import gethostname # 2. IMPLEMENTATION # @@ -69,37 +86,175 @@ import types # In the bottleneck of this application it's appropriate to abbreviate # names to increase speed. -# A dictionary with an entry for (Python source file name, line number -# in that file) if that line has been executed. -c = {} +class StatementFindingAstVisitor(compiler.visitor.ASTVisitor): + def __init__(self, statements, excluded, suite_spots): + compiler.visitor.ASTVisitor.__init__(self) + self.statements = statements + self.excluded = excluded + self.suite_spots = suite_spots + self.excluding_suite = 0 + + def doRecursive(self, node): + self.recordNodeLine(node) + for n in node.getChildNodes(): + self.dispatch(n) -# t(f, x, y). This method is passed to sys.settrace as a trace -# function. See [van Rossum 2001-07-20b, 9.2] for an explanation of -# sys.settrace and the arguments and return value of the trace function. -# See [van Rossum 2001-07-20a, 3.2] for a description of frame and code -# objects. + visitStmt = visitModule = doRecursive + + def doCode(self, node): + if hasattr(node, 'decorators') and node.decorators: + self.dispatch(node.decorators) + self.recordAndDispatch(node.code) + else: + self.doSuite(node, node.code) + + visitFunction = visitClass = doCode -def t(f, x, y): - c[(f.f_code.co_filename, f.f_lineno)] = 1 - return t + def getFirstLine(self, node): + # Find the first line in the tree node. + lineno = node.lineno + for n in node.getChildNodes(): + f = self.getFirstLine(n) + if lineno and f: + lineno = min(lineno, f) + else: + lineno = lineno or f + return lineno + + def getLastLine(self, node): + # Find the first line in the tree node. + lineno = node.lineno + for n in node.getChildNodes(): + lineno = max(lineno, self.getLastLine(n)) + return lineno + + def doStatement(self, node): + self.recordLine(self.getFirstLine(node)) + + visitAssert = visitAssign = visitAssTuple = visitDiscard = visitPrint = \ + visitPrintnl = visitRaise = visitSubscript = visitDecorators = \ + doStatement + + def recordNodeLine(self, node): + return self.recordLine(node.lineno) + + def recordLine(self, lineno): + # Returns a bool, whether the line is included or excluded. + if lineno: + # Multi-line tests introducing suites have to get charged to their + # keyword. + if lineno in self.suite_spots: + lineno = self.suite_spots[lineno][0] + # If we're inside an exluded suite, record that this line was + # excluded. + if self.excluding_suite: + self.excluded[lineno] = 1 + return 0 + # If this line is excluded, or suite_spots maps this line to + # another line that is exlcuded, then we're excluded. + elif self.excluded.has_key(lineno) or \ + self.suite_spots.has_key(lineno) and \ + self.excluded.has_key(self.suite_spots[lineno][1]): + return 0 + # Otherwise, this is an executable line. + else: + self.statements[lineno] = 1 + return 1 + return 0 + + default = recordNodeLine + + def recordAndDispatch(self, node): + self.recordNodeLine(node) + self.dispatch(node) + + def doSuite(self, intro, body, exclude=0): + exsuite = self.excluding_suite + if exclude or (intro and not self.recordNodeLine(intro)): + self.excluding_suite = 1 + self.recordAndDispatch(body) + self.excluding_suite = exsuite + + def doPlainWordSuite(self, prevsuite, suite): + # Finding the exclude lines for else's is tricky, because they aren't + # present in the compiler parse tree. Look at the previous suite, + # and find its last line. If any line between there and the else's + # first line are excluded, then we exclude the else. + lastprev = self.getLastLine(prevsuite) + firstelse = self.getFirstLine(suite) + for l in range(lastprev+1, firstelse): + if self.suite_spots.has_key(l): + self.doSuite(None, suite, exclude=self.excluded.has_key(l)) + break + else: + self.doSuite(None, suite) + + def doElse(self, prevsuite, node): + if node.else_: + self.doPlainWordSuite(prevsuite, node.else_) + + def visitFor(self, node): + self.doSuite(node, node.body) + self.doElse(node.body, node) + + def visitIf(self, node): + # The first test has to be handled separately from the rest. + # The first test is credited to the line with the "if", but the others + # are credited to the line with the test for the elif. + self.doSuite(node, node.tests[0][1]) + for t, n in node.tests[1:]: + self.doSuite(t, n) + self.doElse(node.tests[-1][1], node) + + def visitWhile(self, node): + self.doSuite(node, node.body) + self.doElse(node.body, node) + + def visitTryExcept(self, node): + self.doSuite(node, node.body) + for i in range(len(node.handlers)): + a, b, h = node.handlers[i] + if not a: + # It's a plain "except:". Find the previous suite. + if i > 0: + prev = node.handlers[i-1][2] + else: + prev = node.body + self.doPlainWordSuite(prev, h) + else: + self.doSuite(a, h) + self.doElse(node.handlers[-1][2], node) + + def visitTryFinally(self, node): + self.doSuite(node, node.body) + self.doPlainWordSuite(node.body, node.final) + + def visitGlobal(self, node): + # "global" statements don't execute like others (they don't call the + # trace function), so don't record their line numbers. + pass the_coverage = None -class coverage: - error = "coverage error" +class CoverageException(Exception): pass +class coverage: # Name of the cache file (unless environment variable is set). cache_default = ".coverage" # Environment variable naming the cache file. cache_env = "COVERAGE_FILE" + # A dictionary with an entry for (Python source file name, line number + # in that file) if that line has been executed. + c = {} + # A map from canonical Python source file name to a dictionary in # which there's an entry for each line number that has been # executed. cexecuted = {} - # Cache of results of calling the analysis() method, so that you can + # Cache of results of calling the analysis2() method, so that you can # specify both -r and -a without doing double work. analysis_cache = {} @@ -110,11 +265,28 @@ class coverage: def __init__(self): global the_coverage if the_coverage: - raise self.error, "Only one coverage object allowed." - self.cache = os.environ.get(self.cache_env, self.cache_default) - self.restore() - self.analysis_cache = {} + raise CoverageException, "Only one coverage object allowed." + self.usecache = 1 + self.cache = None + self.exclude_re = '' + self.nesting = 0 + self.cstack = [] + self.xstack = [] + self.relative_dir = os.path.normcase(os.path.abspath(os.curdir)+os.path.sep) + # t(f, x, y). This method is passed to sys.settrace as a trace function. + # See [van Rossum 2001-07-20b, 9.2] for an explanation of sys.settrace and + # the arguments and return value of the trace function. + # See [van Rossum 2001-07-20a, 3.2] for a description of frame and code + # objects. + + def t(self, f, w, a): #pragma: no cover + if w == 'line': + self.c[(f.f_code.co_filename, f.f_lineno)] = 1 + for c in self.cstack: + c[(f.f_code.co_filename, f.f_lineno)] = 1 + return self.t + def help(self, error=None): if error: print error @@ -122,23 +294,26 @@ class coverage: print __doc__ sys.exit(1) - def command_line(self): + def command_line(self, argv, help=None): import getopt + help = help or self.help settings = {} optmap = { '-a': 'annotate', + '-c': 'collect', '-d:': 'directory=', '-e': 'erase', '-h': 'help', '-i': 'ignore-errors', '-m': 'show-missing', + '-p': 'parallel-mode', '-r': 'report', '-x': 'execute', + '-o:': 'omit=', } short_opts = string.join(map(lambda o: o[1:], optmap.keys()), '') long_opts = optmap.values() - options, args = getopt.getopt(sys.argv[1:], short_opts, - long_opts) + options, args = getopt.getopt(argv, short_opts, long_opts) for o, a in options: if optmap.has_key(o): settings[optmap[o]] = 1 @@ -147,86 +322,167 @@ class coverage: elif o[2:] in long_opts: settings[o[2:]] = 1 elif o[2:] + '=' in long_opts: - settings[o[2:]] = a - else: - self.help("Unknown option: '%s'." % o) + settings[o[2:]+'='] = a + else: #pragma: no cover + pass # Can't get here, because getopt won't return anything unknown. + if settings.get('help'): - self.help() + help() + for i in ['erase', 'execute']: - for j in ['annotate', 'report']: + for j in ['annotate', 'report', 'collect']: if settings.get(i) and settings.get(j): - self.help("You can't specify the '%s' and '%s' " + help("You can't specify the '%s' and '%s' " "options at the same time." % (i, j)) + args_needed = (settings.get('execute') or settings.get('annotate') or settings.get('report')) - action = settings.get('erase') or args_needed + action = (settings.get('erase') + or settings.get('collect') + or args_needed) if not action: - self.help("You must specify at least one of -e, -x, -r, " - "or -a.") + help("You must specify at least one of -e, -x, -c, -r, or -a.") if not args_needed and args: - self.help("Unexpected arguments %s." % args) + help("Unexpected arguments: %s" % " ".join(args)) + + self.get_ready(settings.get('parallel-mode')) + self.exclude('#pragma[: ]+[nN][oO] [cC][oO][vV][eE][rR]') + if settings.get('erase'): self.erase() if settings.get('execute'): if not args: - self.help("Nothing to do.") + help("Nothing to do.") sys.argv = args self.start() import __main__ sys.path[0] = os.path.dirname(sys.argv[0]) execfile(sys.argv[0], __main__.__dict__) + if settings.get('collect'): + self.collect() if not args: args = self.cexecuted.keys() + ignore_errors = settings.get('ignore-errors') show_missing = settings.get('show-missing') directory = settings.get('directory=') + + omit = settings.get('omit=') + if omit is not None: + omit = omit.split(',') + else: + omit = [] + if settings.get('report'): - self.report(args, show_missing, ignore_errors) + self.report(args, show_missing, ignore_errors, omit_prefixes=omit) if settings.get('annotate'): - self.annotate(args, directory, ignore_errors) - - def start(self): - sys.settrace(t) + self.annotate(args, directory, ignore_errors, omit_prefixes=omit) + def use_cache(self, usecache, cache_file=None): + self.usecache = usecache + if cache_file and not self.cache: + self.cache_default = cache_file + + def get_ready(self, parallel_mode=False): + if self.usecache and not self.cache: + self.cache = os.environ.get(self.cache_env, self.cache_default) + if parallel_mode: + self.cache += "." + gethostname() + "." + str(os.getpid()) + self.restore() + self.analysis_cache = {} + + def start(self, parallel_mode=False): + self.get_ready(parallel_mode) + if self.nesting == 0: #pragma: no cover + sys.settrace(self.t) + if hasattr(threading, 'settrace'): + threading.settrace(self.t) + self.nesting += 1 + def stop(self): - sys.settrace(None) + self.nesting -= 1 + if self.nesting == 0: #pragma: no cover + sys.settrace(None) + if hasattr(threading, 'settrace'): + threading.settrace(None) def erase(self): - global c - c = {} + self.c = {} self.analysis_cache = {} self.cexecuted = {} - if os.path.exists(self.cache): + if self.cache and os.path.exists(self.cache): os.remove(self.cache) + self.exclude_re = "" + + def exclude(self, re): + if self.exclude_re: + self.exclude_re += "|" + self.exclude_re += "(" + re + ")" + + def begin_recursive(self): + self.cstack.append(self.c) + self.xstack.append(self.exclude_re) + + def end_recursive(self): + self.c = self.cstack.pop() + self.exclude_re = self.xstack.pop() # save(). Save coverage data to the coverage cache. def save(self): - self.canonicalize_filenames() - cache = open(self.cache, 'wb') - import marshal - marshal.dump(self.cexecuted, cache) - cache.close() + if self.usecache and self.cache: + self.canonicalize_filenames() + cache = open(self.cache, 'wb') + import marshal + marshal.dump(self.cexecuted, cache) + cache.close() - # restore(). Restore coverage data from the coverage cache (if it - # exists). + # restore(). Restore coverage data from the coverage cache (if it exists). def restore(self): - global c - c = {} + self.c = {} self.cexecuted = {} - if not os.path.exists(self.cache): - return + assert self.usecache + if os.path.exists(self.cache): + self.cexecuted = self.restore_file(self.cache) + + def restore_file(self, file_name): try: - cache = open(self.cache, 'rb') + cache = open(file_name, 'rb') import marshal cexecuted = marshal.load(cache) cache.close() if isinstance(cexecuted, types.DictType): - self.cexecuted = cexecuted + return cexecuted + else: + return {} except: - pass + return {} + + # collect(). Collect data in multiple files produced by parallel mode + + def collect(self): + cache_dir, local = os.path.split(self.cache) + for file in os.listdir(cache_dir): + if not file.startswith(local): + continue + + full_path = os.path.join(cache_dir, file) + cexecuted = self.restore_file(full_path) + self.merge_data(cexecuted) + + def merge_data(self, new_data): + for file_name, file_data in new_data.items(): + if self.cexecuted.has_key(file_name): + self.merge_file_data(self.cexecuted[file_name], file_data) + else: + self.cexecuted[file_name] = file_data + + def merge_file_data(self, cache_data, new_data): + for line_number in new_data.keys(): + if not cache_data.has_key(line_number): + cache_data[line_number] = new_data[line_number] # canonical_filename(filename). Return a canonical filename for the # file (that is, an absolute path with no redundant components and @@ -247,25 +503,23 @@ class coverage: self.canonical_filename_cache[filename] = cf return self.canonical_filename_cache[filename] - # canonicalize_filenames(). Copy results from "executed" to - # "cexecuted", canonicalizing filenames on the way. Clear the - # "executed" map. + # canonicalize_filenames(). Copy results from "c" to "cexecuted", + # canonicalizing filenames on the way. Clear the "c" map. def canonicalize_filenames(self): - global c - for filename, lineno in c.keys(): + for filename, lineno in self.c.keys(): f = self.canonical_filename(filename) if not self.cexecuted.has_key(f): self.cexecuted[f] = {} self.cexecuted[f][lineno] = 1 - c = {} + self.c = {} # morf_filename(morf). Return the filename for a module or file. def morf_filename(self, morf): if isinstance(morf, types.ModuleType): if not hasattr(morf, '__file__'): - raise self.error, "Module has no __file__ attribute." + raise CoverageException, "Module has no __file__ attribute." file = morf.__file__ else: file = morf @@ -273,9 +527,9 @@ class coverage: # analyze_morf(morf). Analyze the module or filename passed as # the argument. If the source code can't be found, raise an error. - # Otherwise, return a pair of (1) the canonical filename of the - # source code for the module, and (2) a list of lines of statements - # in the source code. + # Otherwise, return a tuple of (1) the canonical filename of the + # source code for the module, (2) a list of lines of statements + # in the source code, and (3) a list of lines of excluded statements. def analyze_morf(self, morf): if self.analysis_cache.has_key(morf): @@ -284,57 +538,82 @@ class coverage: ext = os.path.splitext(filename)[1] if ext == '.pyc': if not os.path.exists(filename[0:-1]): - raise self.error, ("No source for compiled code '%s'." + raise CoverageException, ("No source for compiled code '%s'." % filename) filename = filename[0:-1] elif ext != '.py': - raise self.error, "File '%s' not Python source." % filename + raise CoverageException, "File '%s' not Python source." % filename source = open(filename, 'r') - import parser - tree = parser.suite(source.read()).totuple(1) + lines, excluded_lines = self.find_executable_statements( + source.read(), exclude=self.exclude_re + ) source.close() - statements = {} - self.find_statements(tree, statements) - lines = statements.keys() - lines.sort() - result = filename, lines + result = filename, lines, excluded_lines self.analysis_cache[morf] = result return result - # find_statements(tree, dict). Find each statement in the parse - # tree and record the line on which the statement starts in the - # dictionary (by assigning it to 1). - # - # It works by walking the whole tree depth-first. Every time it - # comes across a statement (symbol.stmt -- this includes compound - # statements like 'if' and 'while') it calls find_statement, which - # descends the tree below the statement to find the first terminal - # token in that statement and record the lines on which that token - # was found. - # - # This algorithm may find some lines several times (because of the - # grammar production statement -> compound statement -> statement), - # but that doesn't matter because we record lines as the keys of the - # dictionary. - # - # See also [GDR 2001-12-04b, 3.2]. - - def find_statements(self, tree, dict): + def get_suite_spots(self, tree, spots): import symbol, token - if token.ISNONTERMINAL(tree[0]): - for t in tree[1:]: - self.find_statements(t, dict) - if tree[0] == symbol.stmt: - self.find_statement(tree[1], dict) - elif (tree[0] == token.NAME - and tree[1] in ['elif', 'except', 'finally']): - dict[tree[2]] = 1 + for i in range(1, len(tree)): + if type(tree[i]) == type(()): + if tree[i][0] == symbol.suite: + # Found a suite, look back for the colon and keyword. + lineno_colon = lineno_word = None + for j in range(i-1, 0, -1): + if tree[j][0] == token.COLON: + lineno_colon = tree[j][2] + elif tree[j][0] == token.NAME: + if tree[j][1] == 'elif': + # Find the line number of the first non-terminal + # after the keyword. + t = tree[j+1] + while t and token.ISNONTERMINAL(t[0]): + t = t[1] + if t: + lineno_word = t[2] + else: + lineno_word = tree[j][2] + break + elif tree[j][0] == symbol.except_clause: + # "except" clauses look like: + # ('except_clause', ('NAME', 'except', lineno), ...) + if tree[j][1][0] == token.NAME: + lineno_word = tree[j][1][2] + break + if lineno_colon and lineno_word: + # Found colon and keyword, mark all the lines + # between the two with the two line numbers. + for l in range(lineno_word, lineno_colon+1): + spots[l] = (lineno_word, lineno_colon) + self.get_suite_spots(tree[i], spots) - def find_statement(self, tree, dict): - import token - while token.ISNONTERMINAL(tree[0]): - tree = tree[1] - dict[tree[2]] = 1 + def find_executable_statements(self, text, exclude=None): + # Find lines which match an exclusion pattern. + excluded = {} + suite_spots = {} + if exclude: + reExclude = re.compile(exclude) + lines = text.split('\n') + for i in range(len(lines)): + if reExclude.search(lines[i]): + excluded[i+1] = 1 + + import parser + tree = parser.suite(text+'\n\n').totuple(1) + self.get_suite_spots(tree, suite_spots) + + # Use the compiler module to parse the text and find the executable + # statements. We add newlines to be impervious to final partial lines. + statements = {} + ast = compiler.parse(text+'\n\n') + visitor = StatementFindingAstVisitor(statements, excluded, suite_spots) + compiler.walk(ast, visitor, walker=visitor) + + lines = statements.keys() + lines.sort() + excluded_lines = excluded.keys() + excluded_lines.sort() + return lines, excluded_lines # format_lines(statements, lines). Format a list of line numbers # for printing by coalescing groups of lines as long as the lines @@ -367,11 +646,15 @@ class coverage: return "%d" % start else: return "%d-%d" % (start, end) - import string return string.join(map(stringify, pairs), ", ") + # Backward compatibility with version 1. def analysis(self, morf): - filename, statements = self.analyze_morf(morf) + f, s, _, m, mf = self.analysis2(morf) + return f, s, m, mf + + def analysis2(self, morf): + filename, statements, excluded = self.analyze_morf(morf) self.canonicalize_filenames() if not self.cexecuted.has_key(filename): self.cexecuted[filename] = {} @@ -379,18 +662,45 @@ class coverage: for line in statements: if not self.cexecuted[filename].has_key(line): missing.append(line) - return (filename, statements, missing, + return (filename, statements, excluded, missing, self.format_lines(statements, missing)) + def relative_filename(self, filename): + """ Convert filename to relative filename from self.relative_dir. + """ + return filename.replace(self.relative_dir, "") + def morf_name(self, morf): + """ Return the name of morf as used in report. + """ if isinstance(morf, types.ModuleType): return morf.__name__ else: - return os.path.splitext(os.path.basename(morf))[0] + return self.relative_filename(os.path.splitext(morf)[0]) - def report(self, morfs, show_missing=1, ignore_errors=0): + def filter_by_prefix(self, morfs, omit_prefixes): + """ Return list of morfs where the morf name does not begin + with any one of the omit_prefixes. + """ + filtered_morfs = [] + for morf in morfs: + for prefix in omit_prefixes: + if self.morf_name(morf).startswith(prefix): + break + else: + filtered_morfs.append(morf) + + return filtered_morfs + + def morf_name_compare(self, x, y): + return cmp(self.morf_name(x), self.morf_name(y)) + + def report(self, morfs, show_missing=1, ignore_errors=0, file=None, omit_prefixes=[]): if not isinstance(morfs, types.ListType): morfs = [morfs] + morfs = self.filter_by_prefix(morfs, omit_prefixes) + morfs.sort(self.morf_name_compare) + max_name = max([5,] + map(len, map(self.morf_name, morfs))) fmt_name = "%%- %ds " % max_name fmt_err = fmt_name + "%s: %s" @@ -399,14 +709,16 @@ class coverage: if show_missing: header = header + " Missing" fmt_coverage = fmt_coverage + " %s" - print header - print "-" * len(header) + if not file: + file = sys.stdout + print >>file, header + print >>file, "-" * len(header) total_statements = 0 total_executed = 0 for morf in morfs: name = self.morf_name(morf) try: - _, statements, missing, readable = self.analysis(morf) + _, statements, _, missing, readable = self.analysis2(morf) n = len(statements) m = n - len(missing) if n > 0: @@ -416,17 +728,17 @@ class coverage: args = (name, n, m, pc) if show_missing: args = args + (readable,) - print fmt_coverage % args + print >>file, fmt_coverage % args total_statements = total_statements + n total_executed = total_executed + m - except KeyboardInterrupt: + except KeyboardInterrupt: #pragma: no cover raise except: if not ignore_errors: type, msg = sys.exc_info()[0:2] - print fmt_err % (name, type, msg) + print >>file, fmt_err % (name, type, msg) if len(morfs) > 1: - print "-" * len(header) + print >>file, "-" * len(header) if total_statements > 0: pc = 100.0 * total_executed / total_statements else: @@ -434,76 +746,88 @@ class coverage: args = ("TOTAL", total_statements, total_executed, pc) if show_missing: args = args + ("",) - print fmt_coverage % args + print >>file, fmt_coverage % args # annotate(morfs, ignore_errors). - blank_re = re.compile("\\s*(#|$)") - else_re = re.compile("\\s*else\\s*:\\s*(#|$)") + blank_re = re.compile(r"\s*(#|$)") + else_re = re.compile(r"\s*else\s*:\s*(#|$)") - def annotate(self, morfs, directory=None, ignore_errors=0): + def annotate(self, morfs, directory=None, ignore_errors=0, omit_prefixes=[]): + morfs = self.filter_by_prefix(morfs, omit_prefixes) for morf in morfs: try: - filename, statements, missing, _ = self.analysis(morf) - source = open(filename, 'r') - if directory: - dest_file = os.path.join(directory, - os.path.basename(filename) - + ',cover') - else: - dest_file = filename + ',cover' - dest = open(dest_file, 'w') - lineno = 0 - i = 0 - j = 0 - covered = 1 - while 1: - line = source.readline() - if line == '': - break - lineno = lineno + 1 - while i < len(statements) and statements[i] < lineno: - i = i + 1 - while j < len(missing) and missing[j] < lineno: - j = j + 1 - if i < len(statements) and statements[i] == lineno: - covered = j >= len(missing) or missing[j] > lineno - if self.blank_re.match(line): - dest.write(' ') - elif self.else_re.match(line): - # Special logic for lines containing only - # 'else:'. See [GDR 2001-12-04b, 3.2]. - if i >= len(statements) and j >= len(missing): - dest.write('! ') - elif i >= len(statements) or j >= len(missing): - dest.write('> ') - elif statements[i] == missing[j]: - dest.write('! ') - else: - dest.write('> ') - elif covered: - dest.write('> ') - else: - dest.write('! ') - dest.write(line) - source.close() - dest.close() + filename, statements, excluded, missing, _ = self.analysis2(morf) + self.annotate_file(filename, statements, excluded, missing, directory) except KeyboardInterrupt: raise except: if not ignore_errors: raise - + + def annotate_file(self, filename, statements, excluded, missing, directory=None): + source = open(filename, 'r') + if directory: + dest_file = os.path.join(directory, + os.path.basename(filename) + + ',cover') + else: + dest_file = filename + ',cover' + dest = open(dest_file, 'w') + lineno = 0 + i = 0 + j = 0 + covered = 1 + while 1: + line = source.readline() + if line == '': + break + lineno = lineno + 1 + while i < len(statements) and statements[i] < lineno: + i = i + 1 + while j < len(missing) and missing[j] < lineno: + j = j + 1 + if i < len(statements) and statements[i] == lineno: + covered = j >= len(missing) or missing[j] > lineno + if self.blank_re.match(line): + dest.write(' ') + elif self.else_re.match(line): + # Special logic for lines containing only 'else:'. + # See [GDR 2001-12-04b, 3.2]. + if i >= len(statements) and j >= len(missing): + dest.write('! ') + elif i >= len(statements) or j >= len(missing): + dest.write('> ') + elif statements[i] == missing[j]: + dest.write('! ') + else: + dest.write('> ') + elif lineno in excluded: + dest.write('- ') + elif covered: + dest.write('> ') + else: + dest.write('! ') + dest.write(line) + source.close() + dest.close() # Singleton object. the_coverage = coverage() # Module functions call methods in the singleton object. -def start(*args): return apply(the_coverage.start, args) -def stop(*args): return apply(the_coverage.stop, args) -def erase(*args): return apply(the_coverage.erase, args) -def analysis(*args): return apply(the_coverage.analysis, args) -def report(*args): return apply(the_coverage.report, args) +def use_cache(*args, **kw): return the_coverage.use_cache(*args, **kw) +def start(*args, **kw): return the_coverage.start(*args, **kw) +def stop(*args, **kw): return the_coverage.stop(*args, **kw) +def erase(*args, **kw): return the_coverage.erase(*args, **kw) +def begin_recursive(*args, **kw): return the_coverage.begin_recursive(*args, **kw) +def end_recursive(*args, **kw): return the_coverage.end_recursive(*args, **kw) +def exclude(*args, **kw): return the_coverage.exclude(*args, **kw) +def analysis(*args, **kw): return the_coverage.analysis(*args, **kw) +def analysis2(*args, **kw): return the_coverage.analysis2(*args, **kw) +def report(*args, **kw): return the_coverage.report(*args, **kw) +def annotate(*args, **kw): return the_coverage.annotate(*args, **kw) +def annotate_file(*args, **kw): return the_coverage.annotate_file(*args, **kw) # Save coverage data when Python exits. (The atexit module wasn't # introduced until Python 2.0, so use sys.exitfunc when it's not @@ -516,18 +840,18 @@ except ImportError: # Command-line interface. if __name__ == '__main__': - the_coverage.command_line() + the_coverage.command_line(sys.argv[1:]) # A. REFERENCES # # [GDR 2001-12-04a] "Statement coverage for Python"; Gareth Rees; # Ravenbrook Limited; 2001-12-04; -# . +# . # # [GDR 2001-12-04b] "Statement coverage for Python: design and # analysis"; Gareth Rees; Ravenbrook Limited; 2001-12-04; -# . +# . # # [van Rossum 2001-07-20a] "Python Reference Manual (releae 2.1.1)"; # Guide van Rossum; 2001-07-20; @@ -561,10 +885,44 @@ if __name__ == '__main__': # so that it matches the value the program would get if it were run on # its own. # +# 2004-12-12 NMB Significant code changes. +# - Finding executable statements has been rewritten so that docstrings and +# other quirks of Python execution aren't mistakenly identified as missing +# lines. +# - Lines can be excluded from consideration, even entire suites of lines. +# - The filesystem cache of covered lines can be disabled programmatically. +# - Modernized the code. # +# 2004-12-14 NMB Minor tweaks. Return 'analysis' to its original behavior +# and add 'analysis2'. Add a global for 'annotate', and factor it, adding +# 'annotate_file'. +# +# 2004-12-31 NMB Allow for keyword arguments in the module global functions. +# Thanks, Allen. +# +# 2005-12-02 NMB Call threading.settrace so that all threads are measured. +# Thanks Martin Fuzzey. Add a file argument to report so that reports can be +# captured to a different destination. +# +# 2005-12-03 NMB coverage.py can now measure itself. +# +# 2005-12-04 NMB Adapted Greg Rogers' patch for using relative filenames, +# and sorting and omitting files to report on. +# +# 2006-07-23 NMB Applied Joseph Tate's patch for function decorators. +# +# 2006-08-21 NMB Applied Sigve Tjora and Mark van der Wal's fixes for argument +# handling. +# +# 2006-08-22 NMB Applied Geoff Bache's parallel mode patch. +# +# 2006-08-23 NMB Refactorings to improve testability. Fixes to command-line +# logic for parallel mode and collect. + # C. COPYRIGHT AND LICENCE # # Copyright 2001 Gareth Rees. All rights reserved. +# Copyright 2004-2006 Ned Batchelder. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are @@ -591,6 +949,4 @@ if __name__ == '__main__': # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH # DAMAGE. # -# -# -# $Id: coverage.py,v 1.1 2005/07/11 20:39:31 dax Exp $ +# $Id: coverage.py 47 2006-08-24 01:08:48Z Ned $ diff --git a/run_test.py b/run_test.py index ffd7ffe..1823549 100644 --- a/run_test.py +++ b/run_test.py @@ -1,10 +1,11 @@ +# -*- coding: utf-8 -*- ## -## run_test.py +## run_tests.py ## Login : David Rousselie -## Started on Wed May 18 13:33:03 2005 David Rousselie -## $Id: run_test.py,v 1.2 2005/09/18 20:24:07 David Rousselie Exp $ +## Started on Wed Aug 9 21:37:35 2006 David Rousselie +## $Id$ ## -## Copyright (C) 2005 David Rousselie +## Copyright (C) 2006 David Rousselie ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or @@ -23,7 +24,9 @@ import coverage coverage.erase() coverage.start() +import logging import unittest +from test import test_support import sys sys.path.append("src") @@ -32,67 +35,32 @@ sys.setdefaultencoding('utf8') del sys.setdefaultencoding import tests -from tests.test_mailconnection import * -from tests.test_mailconnection_factory import * -from tests.test_component import * -from tests.test_storage import * -from tests.test_lang import * -from test import test_support -import logging -import jmc +from tests.jmc.model.test_account import * +import jmc +import jmc.jabber +#import jmc.jabber.component if __name__ == '__main__': logger = logging.getLogger() logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.INFO) - mail_connection_suite = unittest.makeSuite(MailConnection_TestCase, \ - "test") - pop3_connection_suite = unittest.makeSuite(POP3Connection_TestCase, \ - "test") - imap_connection_suite = unittest.makeSuite(IMAPConnection_TestCase, \ - "test") - mc_factory_suite = unittest.makeSuite(MailConnectionFactory_TestCase, \ - "test") - component_suite = unittest.makeSuite(MailComponent_TestCase_Basic, \ - "test") - component2_suite = unittest.makeSuite(MailComponent_TestCase_NoReg, \ - "test") - storage_suite = unittest.makeSuite(Storage_TestCase, \ - "test") - dbmstorage_suite = unittest.makeSuite(DBMStorage_TestCase, \ - "test") - sqlitestorage_suite = unittest.makeSuite(SQLiteStorage_TestCase, \ - "test") - lang_suite = unittest.makeSuite(Lang_TestCase, \ - "test") - - jmc_suite = unittest.TestSuite((mail_connection_suite, \ - pop3_connection_suite, \ - imap_connection_suite, \ - mc_factory_suite, \ - # component_suite, \ - # component2_suite, \ - storage_suite, \ - dbmstorage_suite, \ - sqlitestorage_suite, \ - lang_suite)) - #test_support.run_suite(mail_connection_suite) - #test_support.run_suite(pop3_connection_suite) - #test_support.run_suite(imap_connection_suite) - #test_support.run_suite(mc_factory_suite) - #test_support.run_suite(component_suite) - #test_support.run_suite(component2_suite) - #test_support.run_suite(storage_suite) - #test_support.run_suite(sqlitestorage_suite) - #test_support.run_suite(dbmstorage_suite) + + mail_account_suite = unittest.makeSuite(MailAccount_TestCase, "test") + imap_account_suite = unittest.makeSuite(IMAPAccount_TestCase, "test") + pop3_account_suite = unittest.makeSuite(POP3Account_TestCase, "test") + + jmc_suite = unittest.TestSuite() + jmc_suite = unittest.TestSuite((mail_account_suite, \ + imap_account_suite, \ + pop3_account_suite)) test_support.run_suite(jmc_suite) + coverage.stop() -coverage.analysis(jmc.email.mailconnection_factory) -coverage.analysis(jmc.email.mailconnection) -coverage.analysis(jmc.jabber.component) -coverage.analysis(jmc.jabber.x) -coverage.analysis(jmc.utils.lang) -coverage.report([jmc.email.mailconnection_factory, jmc.email.mailconnection, \ - jmc.jabber.component, jmc.jabber.x, jmc.utils.lang]) +#coverage.analysis(jmc.jabber.component) +coverage.analysis(jmc.model.account) + +coverage.report([ + jmc.model.account]) + diff --git a/src/jmc.py b/src/jmc.py index 84d3989..caf5029 100755 --- a/src/jmc.py +++ b/src/jmc.py @@ -1,12 +1,10 @@ -#!/usr/bin/python -u ## -## Jabber Mail Component ## jmc.py -## Login : David Rousselie -## Started on Fri Jan 7 11:06:42 2005 -## $Id: jmc.py,v 1.3 2005/07/11 20:39:31 dax Exp $ +## Login : +## Started on Fri Jan 19 18:14:41 2007 David Rousselie +## $Id$ ## -## Copyright (C) 2005 +## Copyright (C) 2007 David Rousselie ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or @@ -22,70 +20,27 @@ ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## -import sys -import os.path import logging +import sys reload(sys) sys.setdefaultencoding('utf-8') del sys.setdefaultencoding -from jmc.email import mailconnection -from jmc.jabber.component import MailComponent, ComponentFatalError -from jmc.utils.config import Config -def main(config_file = "jmc.xml", isDebug = 0): - try: - logger = logging.getLogger() - logger.addHandler(logging.StreamHandler()) - if isDebug > 0: - logger.setLevel(logging.DEBUG) - try: - logger.debug("Loading config file " + config_file) - config = Config(config_file) - except: - print >>sys.stderr, "Couldn't load config file:", \ - str(sys.exc_value) - sys.exit(1) +from sqlobject import * +from pyxmpp.message import Message - pidfile = open(config.get_content("config/pidfile"), "w") - pidfile.write(str(os.getpid())) - pidfile.close() - mailconnection.default_encoding = config.get_content("config/mail_default_encoding") - print "creating component..." - mailcomp = MailComponent(config.get_content("config/jabber/service"), \ - config.get_content("config/jabber/secret"), \ - config.get_content("config/jabber/server"), \ - int(config.get_content("config/jabber/port")), \ - config.get_content("config/jabber/language"), \ - int(config.get_content("config/check_interval")), \ - config.get_content("config/spooldir"), \ - config.get_content("config/storage"), \ - config.get_content("config/jabber/vCard/FN")) - - print "starting..." - mailcomp.run(1) - os.remove(config.get_content("config/pidfile")) - except ComponentFatalError,e: - print e - print "Aborting." - sys.exit(1) - -if __name__ == "__main__": - var_option = 0 - file_num = 0 - index = 0 - debug_level = 0 - for opt in sys.argv: - if var_option == 0 and len(opt) == 2 and opt == "-c": - var_option += 1 - elif (var_option & 1) == 1 and len(opt) > 0: - var_option += 1 - file_num = index - if len(opt) == 2 and opt == "-D": - debug_level = 1 - index += 1 - if (var_option & 2) == 2: - main(sys.argv[file_num], debug_level) - else: - main("/etc/jabber/jmc.xml", debug_level) +from jmc.jabber.component import MailComponent +from jmc.model.account import MailPresenceAccount +logger = logging.getLogger() +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.DEBUG) +component = MailComponent("jmc.localhost", \ + "secret", \ + "127.0.0.1", \ + 5349, \ + "sqlite:///tmp/jmc_test.db") +component.account_class = MailPresenceAccount +component.run() +logger.debug("JMC is exiting") diff --git a/src/jmc/__init__.py b/src/jmc/__init__.py index e69de29..7c37883 100644 --- a/src/jmc/__init__.py +++ b/src/jmc/__init__.py @@ -0,0 +1,2 @@ +"""JMC module""" +__revision__ = "" diff --git a/src/jmc/utils/config.py b/src/jmc/config.py similarity index 100% rename from src/jmc/utils/config.py rename to src/jmc/config.py diff --git a/src/jmc/email/mailconnection_factory.py b/src/jmc/email/mailconnection_factory.py deleted file mode 100644 index 1187fb1..0000000 --- a/src/jmc/email/mailconnection_factory.py +++ /dev/null @@ -1,142 +0,0 @@ -## -## mailconnection_factory.py -## Login : David Rousselie -## Started on Fri May 20 10:41:46 2005 David Rousselie -## $Id: mailconnection_factory.py,v 1.2 2005/09/18 20:24:07 dax Exp $ -## -## Copyright (C) 2005 -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -## - -import jmc.email.mailconnection as mailconnection -from jmc.email.mailconnection import IMAPConnection, POP3Connection - -def get_new_mail_connection(type): - """ Static method to return an empty MailConnection object of given type - :Parameters: - - 'type': type of connection to return : 'imap', 'imaps', 'pop3', 'pop3s' - - :return: MailConnection of given type in parameter, None if unknown type - - :returntype: 'MailConnection' - """ - if type == "imap": - return IMAPConnection() - elif type == "imaps": - return IMAPConnection(ssl = True) - elif type == "pop3": - return POP3Connection() - elif type == "pop3s": - return POP3Connection(ssl = True) - raise Exception, "Connection type \"" + type + "\" unknown" - -def str_to_mail_connection(connection_string): - """ Static methode to create a MailConnection object filled from string - - :Parameters: - - 'connection_string': string containing MailConnection parameters separated - by '#'. ex: 'pop3#login#password#host#110#chat_action#online_action#away_action#xa_action#dnd_action#offline_action#check_interval#liv_email_only(#Mailbox)' - - :Types: - - 'connection_string': string - - :return: MailConnection of given type found in string parameter - - :returntype: 'MailConnection' - """ - arg_list = connection_string.split("#") - # optionals values must be the at the beginning of the list to pop them - # last - arg_list.reverse() - type = arg_list.pop() - login = arg_list.pop() - password = arg_list.pop() - if password == "/\\": - password = None - store_password = False - else: - store_password = True - host = arg_list.pop() - port = int(arg_list.pop()) - chat_action = None - online_action = None - away_action = None - xa_action = None - dnd_action = None - offline_action = None - interval = None - live_email_only = False - result = None - if type[0:4] == "imap": - if len(arg_list) == 9: - chat_action = int(arg_list.pop()) - online_action = int(arg_list.pop()) - away_action = int(arg_list.pop()) - xa_action = int(arg_list.pop()) - dnd_action = int(arg_list.pop()) - offline_action = int(arg_list.pop()) - interval = int(arg_list.pop()) - live_email_only = (arg_list.pop().lower() == "true") - else: - retrieve = bool(arg_list.pop() == "True") - if retrieve: - chat_action = online_action = away_action = xa_action = dnd_action = mailconnection.RETRIEVE - else: - chat_action = online_action = away_action = xa_action = dnd_action = mailconnection.DIGEST - offline_action = mailconnection.DO_NOTHING - mailbox = arg_list.pop() - result = IMAPConnection(login = login, \ - password = password, \ - host = host, \ - ssl = (len(type) == 5), \ - port = port, \ - mailbox = mailbox) - elif type[0:4] == "pop3": - if len(arg_list) == 8: - chat_action = int(arg_list.pop()) - online_action = int(arg_list.pop()) - away_action = int(arg_list.pop()) - xa_action = int(arg_list.pop()) - dnd_action = int(arg_list.pop()) - offline_action = int(arg_list.pop()) - interval = int(arg_list.pop()) - live_email_only = (arg_list.pop().lower() == "true") - else: - retrieve = bool(arg_list.pop() == "True") - if retrieve: - chat_action = online_action = away_action = xa_action = dnd_action = mailconnection.RETRIEVE - else: - chat_action = online_action = away_action = xa_action = dnd_action = mailconnection.DIGEST - offline_action = mailconnection.DO_NOTHING - result = POP3Connection(login = login, \ - password = password, \ - host = host, \ - port = port, \ - ssl = (len(type) == 5)) - if result is None: - raise Exception, "Connection type \"" + type + "\" unknown" - result.store_password = store_password - result.chat_action = chat_action - result.online_action = online_action - result.away_action = away_action - result.xa_action = xa_action - result.dnd_action = dnd_action - result.offline_action = offline_action - if interval is not None: - result.interval = interval - result.live_email_only = live_email_only - return result - - diff --git a/src/jmc/jabber/__init__.py b/src/jmc/jabber/__init__.py index e69de29..e90a3dc 100644 --- a/src/jmc/jabber/__init__.py +++ b/src/jmc/jabber/__init__.py @@ -0,0 +1,2 @@ +"""Jabber component classes""" +__revision__ = "" diff --git a/src/jmc/jabber/x.py b/src/jmc/jabber/x.py deleted file mode 100644 index a9e41d3..0000000 --- a/src/jmc/jabber/x.py +++ /dev/null @@ -1,129 +0,0 @@ -## -## x.py -## Login : David Rousselie -## Started on Fri Jan 7 11:06:42 2005 -## $Id: x.py,v 1.3 2005/09/18 20:24:07 dax Exp $ -## -## Copyright (C) 2005 -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -## - -import sys -from pyxmpp.stanza import common_doc - -class Option(object): - def __init__(self, label, value): - self.__label = label - self.__value = value - - def get_xml(self, parent): - if parent is None: - option = common_doc.newChild(None, "option", None) - else: - option = parent.newChild(None, "option", None) - option.setProp("label", self.__label) - option.newChild(None, "value", self.__value) - return option - -class Field(object): - def __init__(self, type, label, var, value): - self.__type = type - self.__label = label - self.__var = var - self.value = value - self.__options = [] - - def add_option(self, label, value): - option = Option(label, value) - self.__options.append(option) - return option - - def get_xml(self, parent): - if parent is None: - raise Exception, "parent field should not be None" - else: - field = parent.newChild(None, "field", None) - field.setProp("type", self.__type) - if not self.__label is None: - field.setProp("label", self.__label) - if not self.__var is None: - field.setProp("var", self.__var) - if self.value: - field.newChild(None, "value", self.value) - for option in self.__options: - option.get_xml(field) - return field - -class X(object): - def __init__(self): - self.fields = {} - self.fields_tab = [] - self.title = None - self.instructions = None - self.type = None - self.xmlns = None - - def add_field(self, type = "fixed", label = None, var = None, value = ""): - field = Field(type, label, var, value) - self.fields[var] = field - # fields_tab exist to keep added fields order - self.fields_tab.append(field) - return field - - def attach_xml(self, iq): - node = iq.newChild(None, "x", None) - _ns = node.newNs(self.xmlns, None) - node.setNs(_ns) - if not self.title is None: - node.newTextChild(None, "title", self.title) - if not self.instructions is None: - node.newTextChild(None, "instructions", self.instructions) - for field in self.fields_tab: - field.get_xml(node) - return node - - def from_xml(self, node): - ## TODO : test node type and ns and clean that loop !!!! - while node and node.type != "element": - node = node.next - child = node.children - while child: - ## TODO : test child type (element) and ns (jabber:x:data) - if child.type == "element" and child.name == "field": - if child.hasProp("type"): - type = child.prop("type") - else: - type = "" - - if child.hasProp("label"): - label = child.prop("label") - else: - label = "" - - if child.hasProp("var"): - var = child.prop("var") - else: - var = "" - - xval = child.children - while xval and xval.name != "value": - xval = xval.next - if xval: - value = xval.getContent() - else: - value = "" - field = Field(type, label, var, value) - self.fields[var] = field - child = child.next diff --git a/src/jmc/utils/lang.py b/src/jmc/lang.py similarity index 100% rename from src/jmc/utils/lang.py rename to src/jmc/lang.py diff --git a/src/jmc/model/__init__.py b/src/jmc/model/__init__.py new file mode 100644 index 0000000..7b93e19 --- /dev/null +++ b/src/jmc/model/__init__.py @@ -0,0 +1,2 @@ +"""Contains data model classes""" +__revision__ = "" diff --git a/src/jmc/email/mailconnection.py b/src/jmc/model/account.py similarity index 67% rename from src/jmc/email/mailconnection.py rename to src/jmc/model/account.py index 4f68847..496423d 100644 --- a/src/jmc/email/mailconnection.py +++ b/src/jmc/model/account.py @@ -1,10 +1,10 @@ ## -## mailconnection.py -## Login : David Rousselie -## Started on Fri Jan 7 11:06:42 2005 -## $Id: mailconnection.py,v 1.11 2005/09/18 20:24:07 dax Exp $ +## account.py +## Login : +## Started on Fri Jan 19 18:21:44 2007 David Rousselie +## $Id$ ## -## Copyright (C) 2005 +## Copyright (C) 2007 David Rousselie ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or @@ -20,7 +20,6 @@ ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## - import sys import logging import email @@ -31,16 +30,15 @@ import poplib import imaplib import socket -from jmc.utils.lang import Lang +from sqlobject.inheritance import InheritableSQLObject +from sqlobject.col import StringCol, IntCol, BoolCol + +from jcl.model.account import PresenceAccount +from jmc.lang import Lang IMAP4_TIMEOUT = 10 POP3_TIMEOUT = 10 -DO_NOTHING = 0 -DIGEST = 1 -RETRIEVE = 2 -default_encoding = "iso-8859-1" - ## All MY* classes are implemented to add a timeout (settimeout) ## while connecting class MYIMAP4(imaplib.IMAP4): @@ -127,77 +125,84 @@ class MYPOP3_SSL(poplib.POP3_SSL): self._debugging = 0 self.welcome = self._getresp() -class MailConnection(object): +class MailAccount(PresenceAccount): """ Wrapper to mail connection and action. Abstract class, do not represent real mail connection type""" - _logger = logging.getLogger("jmc.MailConnection") + # Define constants + DIGEST = 1 + RETRIEVE = 2 + default_encoding = "iso-8859-1" + possibles_actions = [PresenceAccount.DO_NOTHING, \ + DIGEST, \ + RETRIEVE] - def __init__(self, login = "", password = "", host = "", \ - port = 110, ssl = False): - """ Initialize MailConnection object for common parameters of all - connections types - - :Parameters: - - 'login': login used to connect mail server - - 'password': password associated with 'login' to connect mail server - - 'host': mail server hostname - - 'port': mail server port - - 'ssl': activate ssl to connect server - - :Types: - - 'login': string - - 'password': string - - 'host': string - - 'port': int - - 'ssl': boolean""" - self.login = login - self.password = password - self.store_password = True - self.host = host - self.port = port - self.ssl = ssl - self.status = "offline" - self.connection = None - self.chat_action = RETRIEVE - self.online_action = RETRIEVE - self.away_action = RETRIEVE - self.xa_action = RETRIEVE - self.dnd_action = RETRIEVE - self.offline_action = DO_NOTHING - self.interval = 5 - self.lastcheck = 0 - self.default_lang_class = Lang.en - self.waiting_password_reply = False - self.in_error = False - self.live_email_only = False - self.first_check = True - - def __eq__(self, other): - return self.get_type() == other.get_type() \ - and self.login == other.login \ - and (not self.store_password or self.password == other.password) \ - and self.store_password == other.store_password \ - and self.host == other.host \ - and self.port == other.port \ - and self.ssl == other.ssl \ - and self.chat_action == other.chat_action \ - and self.online_action == other.online_action \ - and self.away_action == other.away_action \ - and self.xa_action == other.xa_action \ - and self.dnd_action == other.dnd_action \ - and self.offline_action == other.offline_action \ - and self.interval == other.interval \ - and self.live_email_only == other.live_email_only + login = StringCol(default = "") + password = StringCol(default = None) + host = StringCol(default = "localhost") + port = IntCol(default = 110) + ssl = BoolCol(default = False) + interval = IntCol(default = 5) + store_password = BoolCol(default = True) + live_email_only = BoolCol(default = False) - def __str__(self): - return self.get_type() + "#" + self.login + "#" + \ - (self.store_password and self.password or "/\\") + "#" \ - + self.host + "#" + str(self.port) + "#" + str(self.chat_action) + "#" \ - + str(self.online_action) + "#" + str(self.away_action) + "#" + \ - str(self.xa_action) + "#" + str(self.dnd_action) + "#" + \ - str(self.offline_action) + "#" + str(self.interval) + "#" + str(self.live_email_only) - + lastcheck = IntCol(default = 0) + waiting_password_reply = BoolCol(default = False) + in_error = BoolCol(default = False) + first_check = BoolCol(default = True) + + def _init(self, *args, **kw): + """MailAccount init + Initialize class attributes""" + InheritableSQLObject._init(self, *args, **kw) + self.__logger = logging.getLogger("jmc.model.account.MailAccount") + self.connection = None + self.default_lang_class = Lang.en # TODO: use String + + def _get_register_fields(cls): + """See Account._get_register_fields + """ + def password_post_func(password): + if password is None or password == "": + return None + return password + + return Account.get_register_fields() + \ + [("login", "text-single", None, account.string_not_null_post_func, \ + account.mandatory_field), \ + ("password", "text-private", None, password_post_func, \ + (lambda field_name: None)), \ + ("host", "text-single", None, account.string_not_null_post_func, \ + account.mandatory_field), \ + ("port", "text-single", None, account.int_post_func, \ + account.mandatory_field), \ + ("ssl", "boolean", None, account.default_post_func, \ + (lambda field_name: None)), \ + ("store_password", "boolean", None, account.default_post_func, \ + (lambda field_name: True)), \ + ("live_email_only", "boolean", None, account.default_post_func, \ + lambda field_name: False)] + + get_register_fields = classmethod(_get_register_fields) + + def _get_presence_actions_fields(cls): + """See PresenceAccount._get_presence_actions_fields + """ + return {'chat_action': (cls.possibles_actions, \ + RETRIEVE), \ + 'online_action': (cls.possibles_actions, \ + RETRIEVE), \ + 'away_action': (cls.possibles_actions, \ + RETRIEVE), \ + 'xa_action': (cls.possibles_actions, \ + RETRIEVE), \ + 'dnd_action': (cls.possibles_actions, \ + RETRIEVE), \ + 'offline_action': (cls.possibles_actions, \ + PresenceAccount.DO_NOTHING)} + + get_presence_actions_fields = classmethod(_get_presence_actions_fields) + def get_decoded_part(self, part, charset_hint): content_charset = part.get_content_charset() result = u"" @@ -292,26 +297,26 @@ class MailConnection(object): unicode(self.port) def connect(self): - pass + raise NotImplementedError def disconnect(self): - pass + raise NotImplementedError def get_mail_list(self): - return 0 + raise NotImplementedError def get_mail(self, index): - return None + raise NotImplementedError def get_mail_summary(self, index): - return None + raise NotImplementedError def get_next_mail_index(self, mail_list): - pass + raise NotImplementedError # Does not modify server state but just internal JMC state def mark_all_as_read(self): - pass + raise NotImplementedError def get_action(self): mapping = {"online": self.online_action, @@ -322,29 +327,35 @@ class MailConnection(object): "offline": self.offline_action} if mapping.has_key(self.status): return mapping[self.status] - return DO_NOTHING + return PresenceAccount.DO_NOTHING action = property(get_action) - -class IMAPConnection(MailConnection): - _logger = logging.getLogger("jmc.IMAPConnection") - def __init__(self, login = "", password = "", host = "", \ - port = None, ssl = False, mailbox = "INBOX"): - if not port: - if ssl: - port = 993 - else: - port = 143 - MailConnection.__init__(self, login, password, host, port, ssl) - self.mailbox = mailbox +class IMAPAccount(MailAccount): + mailbox = StringCol(default = "INBOX") # TODO : set default INBOX in reg_form (use get_register_fields last field ?) - def __eq__(self, other): - return MailConnection.__eq__(self, other) \ - and self.mailbox == other.mailbox + def _get_register_fields(cls): + """See Account._get_register_fields + """ + def password_post_func(password): + if password is None or password == "": + return None + return password + + return MailAccount.get_register_fields() + \ + [("mailbox", "text-single", None, account.string_not_null_post_func, \ + (lambda field_name: "INBOX")), \ + ("password", "text-private", None, password_post_func, \ + (lambda field_name: None)), \ + ("store_password", "boolean", None, account.boolean_post_func, \ + lambda field_name: True)] - def __str__(self): - return MailConnection.__str__(self) + "#" + self.mailbox + get_register_fields = classmethod(_get_register_fields) + + + def _init(self, *args, **kw): + MailAccount._init(self, *args, **kw) + self.__logger = logging.getLogger("jmc.IMAPConnection") def get_type(self): if self.ssl: @@ -352,10 +363,10 @@ class IMAPConnection(MailConnection): return "imap" def get_status(self): - return MailConnection.get_status(self) + "/" + self.mailbox + return MailAccount.get_status(self) + "/" + self.mailbox def connect(self): - IMAPConnection._logger.debug("Connecting to IMAP server " \ + self.__logger.debug("Connecting to IMAP server " \ + self.login + "@" + self.host + ":" + str(self.port) \ + " (" + self.mailbox + "). SSL=" \ + str(self.ssl)) @@ -366,12 +377,12 @@ class IMAPConnection(MailConnection): self.connection.login(self.login, self.password) def disconnect(self): - IMAPConnection._logger.debug("Disconnecting from IMAP server " \ + self.__logger.debug("Disconnecting from IMAP server " \ + self.host) self.connection.logout() def get_mail_list(self): - IMAPConnection._logger.debug("Getting mail list") + self.__logger.debug("Getting mail list") typ, data = self.connection.select(self.mailbox) typ, data = self.connection.search(None, 'RECENT') if typ == 'OK': @@ -379,7 +390,7 @@ class IMAPConnection(MailConnection): return None def get_mail(self, index): - IMAPConnection._logger.debug("Getting mail " + str(index)) + self.__logger.debug("Getting mail " + str(index)) typ, data = self.connection.select(self.mailbox, True) typ, data = self.connection.fetch(index, '(RFC822)') if typ == 'OK': @@ -387,7 +398,7 @@ class IMAPConnection(MailConnection): return u"Error while fetching mail " + str(index) def get_mail_summary(self, index): - IMAPConnection._logger.debug("Getting mail summary " + str(index)) + self.__logger.debug("Getting mail summary " + str(index)) typ, data = self.connection.select(self.mailbox, True) typ, data = self.connection.fetch(index, '(RFC822)') if typ == 'OK': @@ -404,29 +415,25 @@ class IMAPConnection(MailConnection): type = property(get_type) -class POP3Connection(MailConnection): - _logger = logging.getLogger("jmc.POP3Connection") - - def __init__(self, login = "", password = "", host = "", \ - port = None, ssl = False): - if not port: - if ssl: - port = 995 - else: - port = 110 - MailConnection.__init__(self, login, password, host, port, ssl) - self.__nb_mail = 0 - self.__lastmail = 0 +class POP3Account(MailAccount): + nb_mail = IntCol(default = 0) + lastmail = IntCol(default = 0) + + def _init(self, *args, **kw): + MailAccount._init(self, *args, **kw) + self.__logger = logging.getLogger("jmc.model.account.POP3Account") def get_type(self): if self.ssl: return "pop3s" return "pop3" + type = property(get_type) + def connect(self): - POP3Connection._logger.debug("Connecting to POP3 server " \ - + self.login + "@" + self.host + ":" + str(self.port)\ - + ". SSL=" + str(self.ssl)) + self.__logger.debug("Connecting to POP3 server " \ + + self.login + "@" + self.host + ":" + str(self.port)\ + + ". SSL=" + str(self.ssl)) if self.ssl: self.connection = MYPOP3_SSL(self.host, self.port) else: @@ -439,18 +446,18 @@ class POP3Connection(MailConnection): def disconnect(self): - POP3Connection._logger.debug("Disconnecting from POP3 server " \ - + self.host) + self.__logger.debug("Disconnecting from POP3 server " \ + + self.host) self.connection.quit() def get_mail_list(self): - POP3Connection._logger.debug("Getting mail list") + self.__logger.debug("Getting mail list") count, size = self.connection.stat() - self.__nb_mail = count + self.nb_mail = count return [str(i) for i in range(1, count + 1)] def get_mail(self, index): - POP3Connection._logger.debug("Getting mail " + str(index)) + self.__logger.debug("Getting mail " + str(index)) ret, data, size = self.connection.retr(index) try: self.connection.rset() @@ -461,7 +468,7 @@ class POP3Connection(MailConnection): return u"Error while fetching mail " + str(index) def get_mail_summary(self, index): - POP3Connection._logger.debug("Getting mail summary " + str(index)) + self.__logger.debug("Getting mail summary " + str(index)) ret, data, size = self.connection.retr(index) try: self.connection.rset() @@ -472,16 +479,15 @@ class POP3Connection(MailConnection): return u"Error while fetching mail " + str(index) def get_next_mail_index(self, mail_list): - if self.__nb_mail == self.__lastmail: + if self.nb_mail == self.lastmail: return None - if self.__nb_mail < self.__lastmail: - self.__lastmail = 0 - result = int(mail_list[self.__lastmail]) - self.__lastmail += 1 + if self.nb_mail < self.lastmail: + self.lastmail = 0 + result = int(mail_list[self.lastmail]) + self.lastmail += 1 return result def mark_all_as_read(self): self.get_mail_list() - self.__lastmail = self.__nb_mail + self.lastmail = self.nb_mail - type = property(get_type) diff --git a/src/jmc/utils/__init__.py b/src/jmc/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/jmc/utils/storage.py b/src/jmc/utils/storage.py deleted file mode 100644 index f12a3fd..0000000 --- a/src/jmc/utils/storage.py +++ /dev/null @@ -1,297 +0,0 @@ -## -## storage.py -## Login : David Rousselie -## Started on Wed Jul 20 20:26:53 2005 dax -## $Id: storage.py,v 1.1 2005/09/18 20:24:07 dax Exp $ -## -## Copyright (C) 2005 dax -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -## - -import os -import re -import os.path -import sys -import anydbm -import logging -from UserDict import UserDict - -import jmc.email.mailconnection_factory as mailconnection_factory - - -class Storage(UserDict): - def __init__(self, nb_pk_fields = 1, spool_dir = ".", db_file = None): - UserDict.__init__(self) - self.nb_pk_fields = nb_pk_fields - if db_file is None: - self._spool_dir = "" - self.set_spool_dir(spool_dir) - self.file = self._spool_dir + "/registered.db" - else: - spool_dir = os.path.dirname(db_file) or "." - self.set_spool_dir(spool_dir) - self.file = db_file - self._registered = self.load() - - def __setitem__(self, pk_tuple, obj): -# print "Adding " + "#".join(map(str, pk_tuple)) + " = " + str(obj) - self._registered[str("#".join(map(str, pk_tuple)))] = obj - - def __getitem__(self, pk_tuple): -# print "Getting " + "#".join(map(str, pk_tuple)) - if len(pk_tuple) == self.nb_pk_fields: - return self._registered[str("#".join(map(str, pk_tuple)))] - else: - partial_key = str("#".join(map(str, pk_tuple))) - regexp = re.compile(partial_key) - return [self._registered[key] - for key in self._registered.keys() - if regexp.search(key)] - - def __delitem__(self, pk_tuple): - #print "Deleting " + "#".join(map(str, pk_tuple)) - del self._registered[str("#".join(map(str, pk_tuple)))] - - def get_spool_dir(self): - return self._spool_dir - - def set_spool_dir(self, spool_dir): - self._spool_dir = spool_dir - if not os.path.isdir(self._spool_dir): - os.makedirs(self._spool_dir) - - spool_dir = property(get_spool_dir, set_spool_dir) - - def has_key(self, pk_tuple): - if len(pk_tuple) == self.nb_pk_fields: - return self._registered.has_key(str("#".join(map(str, pk_tuple)))) - else: - partial_key = str("#".join(map(str, pk_tuple))) - regexp = re.compile("^" + partial_key) - for key in self._registered.keys(): - if regexp.search(key): - return True - return False - - def keys(self, pk_tuple = None): - if pk_tuple is None: - return [tuple(key.split("#")) for key in self._registered.keys()] - else: - level = len(pk_tuple) - partial_key = str("#".join(map(str, pk_tuple))) - regexp = re.compile("^" + partial_key) - result = {} - for key in self._registered.keys(): - if regexp.search(key): - result[key.split("#")[level]] = None - return result.keys() - - def dump(self): - for pk in self._registered.keys(): - print pk + " = " + str(self._registered[pk]) - - def load(self): - pass - -class DBMStorage(Storage): - _logger = logging.getLogger("jmc.utils.DBMStorage") - - def __init__(self, nb_pk_fields = 1, spool_dir = ".", db_file = None): -# print "DBM INIT" - Storage.__init__(self, nb_pk_fields, spool_dir, db_file) - - def __del__(self): - # print "DBM STOP" - self.sync() - - def load(self): - str_registered = anydbm.open(self.file, \ - 'c') - result = {} - try: - for pk in str_registered.keys(): - result[pk] = mailconnection_factory.str_to_mail_connection(str_registered[pk]) - except Exception, e: - print >>sys.stderr, "Cannot load registered.db : " - print >>sys.stderr, e - str_registered.close() - return result - - def sync(self): - #print "DBM SYNC" - self.store() - - def __store(self, nb_pk_fields, registered, pk): - if nb_pk_fields > 0: - for key in registered.keys(): - if pk: - self.__store(nb_pk_fields - 1, \ - registered[key], pk + "#" + key) - else: - self.__store(nb_pk_fields - 1, \ - registered[key], key) - else: - self.__str_registered[pk] = str(registered) -# print "STORING : " + pk + " = " + str(registered) - - def store(self): -# print "DBM STORE" - try: - str_registered = anydbm.open(self.file, \ - 'n') - for pk in self._registered.keys(): - str_registered[pk] = str(self._registered[pk]) - except Exception, e: - print >>sys.stderr, "Cannot save to registered.db : " - print >>sys.stderr, e - str_registered.close() - - def __setitem__(self, pk_tuple, obj): - Storage.__setitem__(self, pk_tuple, obj) - self.sync() - - def __delitem__(self, pk_tuple): - Storage.__delitem__(self, pk_tuple) - self.sync() - -# Do not fail if pysqlite is not installed -try: - from pysqlite2 import dbapi2 as sqlite - - class SQLiteStorage(Storage): - _logger = logging.getLogger("jmc.utils.SQLiteStorage") - - def __init__(self, nb_pk_fields = 1, spool_dir = ".", db_file = None): - self.__connection = None - Storage.__init__(self, nb_pk_fields, spool_dir, db_file) - - def create(self): - SQLiteStorage._logger.debug("creating new Table") - cursor = self.__connection.cursor() - cursor.execute(""" - create table account( - jid STRING, - name STRING, - type STRING, - login STRING, - password STRING, - host STRING, - port INTEGER, - chat_action INTEGER, - online_action INTEGER, - away_action INTEGER, - xa_action INTEGER, - dnd_action INTEGER, - offline_action INTEGER, - interval INTEGER, - live_email_only BOOLEAN, - mailbox STRING, - PRIMARY KEY(jid, name) - ) - """) - self.__connection.commit() - cursor.close() - - def __del__(self): - self.__connection.close() - - def sync(self): - pass - - def load(self): - if not os.path.exists(self.file): - self.__connection = sqlite.connect(self.file) - self.create() - else: - self.__connection = sqlite.connect(self.file) - cursor = self.__connection.cursor() - cursor.execute("""select * from account""") - result = {} - for row in cursor.fetchall(): - # print "Creating new " + row[self.nb_pk_fields] + " connection." - account_type = row[self.nb_pk_fields] - account = result["#".join(row[0:self.nb_pk_fields])] = mailconnection_factory.get_new_mail_connection(account_type) - account.login = row[self.nb_pk_fields + 1] - account.password = row[self.nb_pk_fields + 2] - if account.password is None: - account.store_password = False - else: - account.store_password = True - account.host = row[self.nb_pk_fields + 3] - account.port = int(row[self.nb_pk_fields + 4]) - account.chat_action = int(row[self.nb_pk_fields + 5]) - account.online_action = int(row[self.nb_pk_fields + 6]) - account.away_action = int(row[self.nb_pk_fields + 7]) - account.xa_action = int(row[self.nb_pk_fields + 8]) - account.dnd_action = int(row[self.nb_pk_fields + 9]) - account.offline_action = int(row[self.nb_pk_fields + 10]) - account.interval = int(row[self.nb_pk_fields + 11]) - account.live_email_only = (row[self.nb_pk_fields + 12] == 1) - if account_type[0:4] == "imap": - account.mailbox = row[self.nb_pk_fields + 13] - # for field_index in range(self.nb_pk_fields + 1, len(row)): - # print "\tSetting " + str(cursor.description[field_index][0]) + \ - # " to " + str(row[field_index]) - # setattr(account, - # cursor.description[field_index][0], - # row[field_index]) - cursor.close() - return result - - def __setitem__(self, pk_tuple, obj): - Storage.__setitem__(self, pk_tuple, obj) - cursor = self.__connection.cursor() - mailbox = None - password = None - if obj.type[0:4] == "imap": - mailbox = obj.mailbox - if obj.store_password == True: - password = obj.password - cursor.execute(""" - insert or replace into account values - (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - (pk_tuple[0], - pk_tuple[1], - obj.type, - obj.login, - password, - obj.host, - obj.port, - obj.chat_action, - obj.online_action, - obj.away_action, - obj.xa_action, - obj.dnd_action, - obj.offline_action, - obj.interval, - obj.live_email_only, - mailbox)) - self.__connection.commit() - cursor.close() - - def __delitem__(self, pk_tuple): - Storage.__delitem__(self, pk_tuple) - cursor = self.__connection.cursor() - cursor.execute(""" - delete from account where jid = ? and name = ? - """, - (pk_tuple[0], - pk_tuple[1])) - self.__connection.commit() - cursor.close() -except ImportError: - pass - diff --git a/tests/jmc-test.xml b/tests/jmc-test.xml deleted file mode 100644 index c90e6a9..0000000 --- a/tests/jmc-test.xml +++ /dev/null @@ -1,20 +0,0 @@ - - - 127.0.0.1 - 5347 - secret - jmc.localhost - 5 - en - - Jabber Mail Component - A Jabber mail server component - http://people.happycoders.org/dax/jabber/jmc/ - - - SQLite - jmc.pid - . - 5 - iso-8859-15 - diff --git a/tests/jmc/__init__.py b/tests/jmc/__init__.py new file mode 100644 index 0000000..d424a64 --- /dev/null +++ b/tests/jmc/__init__.py @@ -0,0 +1,2 @@ +"""JMC test module""" +__revision__ = "" diff --git a/tests/dummy_server.py b/tests/jmc/dummy_server.py similarity index 82% rename from tests/dummy_server.py rename to tests/jmc/dummy_server.py index 4d9dca7..7f48481 100644 --- a/tests/dummy_server.py +++ b/tests/jmc/dummy_server.py @@ -29,9 +29,28 @@ import socket import types import select import xml.dom.minidom -import utils from pyxmpp import xmlextra +def xmldiff(node1, node2): + if node1.nodeType == node1.TEXT_NODE: + if not node2.nodeType == node2.TEXT_NODE \ + or re.compile(node2.data + "$").match(node1.data) is None: + raise Exception("data in text node " + node1.data + " does not match " + node2.data) + elif node1.nodeType == node1.DOCUMENT_NODE: + if not node2.nodeType == node2.DOCUMENT_NODE: + raise Exception("node1 is Document but not node2 (" + node2.nodeType + ")") + elif node1.tagName != node2.tagName: + raise Exception("Different tag name : " + node1.tagName + " != " + node2.tagName) + else: + for attr in node1._get_attributes().keys(): + if not node2.hasAttribute(attr) \ + or node1.getAttribute(attr) != node2.getAttribute(attr): + raise Exception("(" + node1.tagName + ") Different attributes : " + node1.getAttribute(attr) + " != " + node2.getAttribute(attr)) + if len(node1.childNodes) != len(node2.childNodes): + raise Exception("(" + node1.tagName + ") Different children number : " + str(len(node1.childNodes)) + " != " + str(len(node2.childNodes))) + for i in range(len(node1.childNodes)): + xmldiff(node1.childNodes[i], node2.childNodes[i]) + class DummyServer: def __init__(self, host, port, responses = None): for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE): @@ -40,7 +59,7 @@ class DummyServer: s = socket.socket(af, socktype, proto) except socket.error, msg: s = None - continue + raise socket.error try: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(sa) @@ -48,7 +67,7 @@ class DummyServer: except socket.error, msg: s.close() s = None - continue + raise socket.error break self.socket = s self.responses = None @@ -56,6 +75,7 @@ class DummyServer: self.real_queries = [] def serve(self): + conn = None try: conn, addr = self.socket.accept() rfile = conn.makefile('rb', -1) @@ -77,9 +97,11 @@ class DummyServer: else: self.real_queries.append(data) #print >>sys.stderr, 'Receive : ', data - conn.close() - self.socket.close() - self.socket = None + if conn is not None: + conn.close() + if self.socket is not None: + self.socket.close() + self.socket = None except: type, value, stack = sys.exc_info() print >>sys.stderr, "".join(traceback.format_exception diff --git a/tests/email_generator.py b/tests/jmc/email_generator.py similarity index 100% rename from tests/email_generator.py rename to tests/jmc/email_generator.py diff --git a/tests/jmc/jabber/__init__.py b/tests/jmc/jabber/__init__.py new file mode 100644 index 0000000..9ef7a58 --- /dev/null +++ b/tests/jmc/jabber/__init__.py @@ -0,0 +1,2 @@ +"""JMC jabber test module""" +__revision__ = "" diff --git a/src/jmc/utils/release.py b/tests/jmc/jabber/test_component.py similarity index 62% rename from src/jmc/utils/release.py rename to tests/jmc/jabber/test_component.py index b8daeeb..118ce52 100644 --- a/src/jmc/utils/release.py +++ b/tests/jmc/jabber/test_component.py @@ -1,10 +1,10 @@ ## -## release.py -## Login : David Rousselie -## Started on Mon Jul 24 22:37:00 2006 dax +## test_component.py +## Login : +## Started on Wed Feb 14 18:04:49 2007 David Rousselie ## $Id$ ## -## Copyright (C) 2006 dax +## Copyright (C) 2007 David Rousselie ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or @@ -20,14 +20,7 @@ ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## -version = "0.2.2" -author = "David Rousselie" -email = "dax@happycoders.org" -license = "GPL" -long_description = """Jabber Mail Component +import unittest -JMC is a jabber service to check email from POP3 and IMAP4 server and retrieve -them or just a notification of new emails. Jabber users can register multiple -email accounts. - -""" +class MailComponent_TestCase(unittest.TestCase): + pass diff --git a/src/jmc/email/__init__.py b/tests/jmc/model/__init__.py similarity index 100% rename from src/jmc/email/__init__.py rename to tests/jmc/model/__init__.py diff --git a/tests/test_mailconnection.py b/tests/jmc/model/test_account.py similarity index 65% rename from tests/test_mailconnection.py rename to tests/jmc/model/test_account.py index 599d997..2f7d4c3 100644 --- a/tests/test_mailconnection.py +++ b/tests/jmc/model/test_account.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- ## -## mailconnection_test.py -## Login : David Rousselie -## Started on Fri May 13 11:32:51 2005 David Rousselie -## $Id: test_mailconnection.py,v 1.2 2005/09/18 20:24:07 David Rousselie Exp $ +## test_account.py +## Login : +## Started on Wed Feb 14 08:23:17 2007 David Rousselie +## $Id$ ## -## Copyright (C) 2005 David Rousselie +## Copyright (C) 2007 David Rousselie ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or @@ -22,20 +22,45 @@ ## import unittest -from jmc.email.mailconnection import IMAPConnection, \ - POP3Connection, \ - MailConnection -import dummy_server -import email_generator +import os import thread -import re -import sys -import string -class MailConnection_TestCase(unittest.TestCase): +from sqlobject import * +from sqlobject.dbconnection import TheURIOpener + +from jcl.model import account +from jcl.model.account import Account, PresenceAccount +from jmc.model.account import MailAccount, POP3Account, IMAPAccount + +from tests.jmc import email_generator, dummy_server + +DB_PATH = "/tmp/jmc_test.db" +DB_URL = DB_PATH # + "?debug=1&debugThreading=1" + +class MailAccount_TestCase(unittest.TestCase): def setUp(self): - self.connection = MailConnection() - + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + Account.createTable(ifNotExists = True) + PresenceAccount.createTable(ifNotExists = True) + MailAccount.createTable(ifNotExists = True) + self.account = MailAccount(user_jid = "user1@test.com", \ + name = "account1", \ + jid = "account1@jmc.test.com") + del account.hub.threadConnection + + def tearDown(self): + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + MailAccount.dropTable(ifExists = True) + PresenceAccount.dropTable(ifExists = True) + Account.dropTable(ifExists = True) + del TheURIOpener.cachedURIs['sqlite://' + DB_URL] + account.hub.threadConnection.close() + del account.hub.threadConnection + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + def make_test(email_type, tested_func, expected_res): def inner(self): encoded, multipart, header = email_type @@ -48,23 +73,23 @@ class MailConnection_TestCase(unittest.TestCase): test_get_decoded_part_not_encoded = \ make_test((False, False, False), \ - lambda self, email: self.connection.get_decoded_part(email, None), \ + lambda self, email: self.account.get_decoded_part(email, None), \ u"Not encoded single part") test_get_decoded_part_encoded = \ make_test((True, False, False), \ - lambda self, email: self.connection.get_decoded_part(email, None), \ + lambda self, email: self.account.get_decoded_part(email, None), \ u"Encoded single part with 'iso-8859-15' charset (éàê)") test_format_message_summary_not_encoded = \ make_test((False, False, True), \ - lambda self, email: self.connection.format_message_summary(email), \ + lambda self, email: self.account.format_message_summary(email), \ (u"From : not encoded from\nSubject : not encoded subject\n\n", \ u"not encoded from")) test_format_message_summary_encoded = \ make_test((True, False, True), \ - lambda self, email: self.connection.format_message_summary(email), \ + lambda self, email: self.account.format_message_summary(email), \ (u"From : encoded from (éàê)\nSubject : encoded subject " + \ u"(éàê)\n\n", \ u"encoded from (éàê)")) @@ -78,21 +103,21 @@ class MailConnection_TestCase(unittest.TestCase): email.replace_header("From", \ "\"" + str(email["From"]) \ + "\" not encoded part") or \ - self.connection.format_message_summary(email), \ + self.account.format_message_summary(email), \ (u"From : \"encoded from (éàê)\" not encoded part\nSubject " + \ u": \"encoded subject (éàê)\" not encoded part\n\n", \ u"\"encoded from (éàê)\" not encoded part")) test_format_message_single_not_encoded = \ make_test((False, False, True), \ - lambda self, email: self.connection.format_message(email), \ + lambda self, email: self.account.format_message(email), \ (u"From : not encoded from\nSubject : not encoded subject" + \ u"\n\nNot encoded single part\n", \ u"not encoded from")) test_format_message_single_encoded = \ make_test((True, False, True), \ - lambda self, email: self.connection.format_message(email), \ + lambda self, email: self.account.format_message(email), \ (u"From : encoded from (éàê)\nSubject : encoded subject " + \ u"(éàê)\n\nEncoded single part with 'iso-8859-15' charset" + \ u" (éàê)\n", \ @@ -100,14 +125,14 @@ class MailConnection_TestCase(unittest.TestCase): test_format_message_multi_not_encoded = \ make_test((False, True, True), \ - lambda self, email: self.connection.format_message(email), \ + lambda self, email: self.account.format_message(email), \ (u"From : not encoded from\nSubject : not encoded subject" + \ u"\n\nNot encoded multipart1\nNot encoded multipart2\n", \ u"not encoded from")) test_format_message_multi_encoded = \ make_test((True, True, True), \ - lambda self, email: self.connection.format_message(email), \ + lambda self, email: self.account.format_message(email), \ (u"From : encoded from (éàê)\nSubject : encoded subject (éà" + \ u"ê)\n\nutf-8 multipart1 with no charset (éàê)" + \ u"\nEncoded multipart2 with 'iso-8859-15' charset (éàê)\n" + \ @@ -115,19 +140,40 @@ class MailConnection_TestCase(unittest.TestCase): u"encoded from (éàê)")) -class POP3Connection_TestCase(unittest.TestCase): +class POP3Account_TestCase(unittest.TestCase): def setUp(self): self.server = dummy_server.DummyServer("localhost", 1110) thread.start_new_thread(self.server.serve, ()) - self.pop3connection = POP3Connection("login", \ - "pass", \ - "localhost", \ - 1110, \ - ssl = False) - + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + Account.createTable(ifNotExists = True) + PresenceAccount.createTable(ifNotExists = True) + MailAccount.createTable(ifNotExists = True) + POP3Account.createTable(ifNotExists = True) + self.pop3_account = POP3Account(user_jid = "user1@test.com", \ + name = "account1", \ + jid = "account1@jmc.test.com", \ + login = "login") + self.pop3_account.password = "pass" + self.pop3_account.host = "localhost" + self.pop3_account.port = 1110 + self.pop3_account.ssl = False + del account.hub.threadConnection + def tearDown(self): + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + POP3Account.dropTable(ifExists = True) + MailAccount.dropTable(ifExists = True) + PresenceAccount.dropTable(ifExists = True) + Account.dropTable(ifExists = True) + del TheURIOpener.cachedURIs['sqlite://' + DB_URL] + account.hub.threadConnection.close() + del account.hub.threadConnection + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) self.server = None - self.pop3connection = None + self.pop3_account = None def make_test(responses = None, queries = None, core = None): def inner(self): @@ -141,12 +187,14 @@ class POP3Connection_TestCase(unittest.TestCase): if queries: self.server.queries += queries self.server.queries += ["QUIT\r\n"] - self.pop3connection.connect() - self.failUnless(self.pop3connection.connection, \ + self.pop3_account.connect() + self.failUnless(self.pop3_account.connection, \ "Cannot establish connection") if core: + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) core(self) - self.pop3connection.disconnect() + del account.hub.threadConnection + self.pop3_account.disconnect() self.failUnless(self.server.verify_queries(), \ "Sended queries does not match expected queries.") return inner @@ -157,7 +205,7 @@ class POP3Connection_TestCase(unittest.TestCase): make_test(["+OK 2 20\r\n"], \ ["STAT\r\n"], \ lambda self: \ - self.assertEquals(self.pop3connection.get_mail_list(), \ + self.assertEquals(self.pop3_account.get_mail_list(), \ ["1", "2"])) test_get_mail_summary = \ @@ -169,7 +217,7 @@ class POP3Connection_TestCase(unittest.TestCase): ["RETR 1\r\n", "RSET\r\n"], \ lambda self: \ - self.assertEquals(self.pop3connection.get_mail_summary(1), \ + self.assertEquals(self.pop3_account.get_mail_summary(1), \ (u"From : user@test.com\n" + \ u"Subject : subject test\n\n", \ u"user@test.com"))) @@ -183,7 +231,7 @@ class POP3Connection_TestCase(unittest.TestCase): ["RETR 1\r\n", "RSET\r\n"], \ lambda self: \ - self.assertEquals(self.pop3connection.get_mail(1), \ + self.assertEquals(self.pop3_account.get_mail(1), \ (u"From : user@test.com\n" + \ u"Subject : subject test\n\n" + \ u"mymessage\n", \ @@ -198,7 +246,7 @@ class POP3Connection_TestCase(unittest.TestCase): ["RETR 1\r\n", "RSET\r\n"], \ lambda self: \ - self.assertEquals(self.pop3connection.get_mail_summary(1), \ + self.assertEquals(self.pop3_account.get_mail_summary(1), \ (u"From : user@test.com\n" + \ u"Subject : subject test\n\n", \ u"user@test.com"))) @@ -212,26 +260,47 @@ class POP3Connection_TestCase(unittest.TestCase): ["RETR 1\r\n", "RSET\r\n"], \ lambda self: \ - self.assertEquals(self.pop3connection.get_mail(1), \ + self.assertEquals(self.pop3_account.get_mail(1), \ (u"From : user@test.com\n" + \ u"Subject : subject test\n\n" + \ u"mymessage\n", \ u"user@test.com"))) -class IMAPConnection_TestCase(unittest.TestCase): +class IMAPAccount_TestCase(unittest.TestCase): def setUp(self): self.server = dummy_server.DummyServer("localhost", 1143) thread.start_new_thread(self.server.serve, ()) - self.imap_connection = IMAPConnection("login", \ - "pass", \ - "localhost", \ - 1143, \ - ssl = False) + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + Account.createTable(ifNotExists = True) + PresenceAccount.createTable(ifNotExists = True) + MailAccount.createTable(ifNotExists = True) + IMAPAccount.createTable(ifNotExists = True) + self.imap_account = IMAPAccount(user_jid = "user1@test.com", \ + name = "account1", \ + jid = "account1@jmc.test.com", \ + login = "login") + self.imap_account.password = "pass" + self.imap_account.host = "localhost" + self.imap_account.port = 1143 + self.imap_account.ssl = False + del account.hub.threadConnection def tearDown(self): + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + IMAPAccount.dropTable(ifExists = True) + MailAccount.dropTable(ifExists = True) + PresenceAccount.dropTable(ifExists = True) + Account.dropTable(ifExists = True) + del TheURIOpener.cachedURIs['sqlite://' + DB_URL] + account.hub.threadConnection.close() + del account.hub.threadConnection + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) self.server = None - self.imap_connection = None + self.imap_account = None def make_test(responses = None, queries = None, core = None): def inner(self): @@ -250,12 +319,14 @@ class IMAPConnection_TestCase(unittest.TestCase): if queries: self.server.queries += queries self.server.queries += ["^[^ ]* LOGOUT"] - self.imap_connection.connect() - self.failUnless(self.imap_connection.connection, \ + self.imap_account.connect() + self.failUnless(self.imap_account.connection, \ "Cannot establish connection") if core: + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) core(self) - self.imap_connection.disconnect() + del account.hub.threadConnection + self.imap_account.disconnect() self.failUnless(self.server.verify_queries()) return inner @@ -271,7 +342,7 @@ class IMAPConnection_TestCase(unittest.TestCase): ["^[^ ]* SELECT INBOX", \ "^[^ ]* SEARCH RECENT"], \ lambda self: \ - self.assertEquals(self.imap_connection.get_mail_list(), ['9', '10'])) + self.assertEquals(self.imap_account.get_mail_list(), ['9', '10'])) test_get_mail_summary = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\ " [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\ @@ -283,7 +354,7 @@ class IMAPConnection_TestCase(unittest.TestCase): data.split()[0] + " OK FETCH completed\r\n"], \ ["^[^ ]* EXAMINE INBOX", \ "^[^ ]* FETCH 1 \(RFC822\)"], \ - lambda self: self.assertEquals(self.imap_connection.get_mail_summary(1), \ + lambda self: self.assertEquals(self.imap_account.get_mail_summary(1), \ (u"From : None\nSubject : None\n\n", \ u"None"))) @@ -297,7 +368,7 @@ class IMAPConnection_TestCase(unittest.TestCase): data.split()[0] + " OK FETCH completed\r\n"], \ ["^[^ ]* EXAMINE INBOX", \ "^[^ ]* FETCH 1 \(RFC822\)",], \ - lambda self: self.assertEquals(self.imap_connection.get_mail(1), \ + lambda self: self.assertEquals(self.imap_account.get_mail(1), \ (u"From : None\nSubject : None\n\nbody text\r\n\n", \ u"None"))) diff --git a/tests/test_lang.py b/tests/jmc/test_lang.py similarity index 100% rename from tests/test_lang.py rename to tests/jmc/test_lang.py diff --git a/tests/test_component.py b/tests/test_component.py deleted file mode 100644 index 3ec2f40..0000000 --- a/tests/test_component.py +++ /dev/null @@ -1,355 +0,0 @@ -## -## test_mailconnection_factory.py -## Login : David Rousselie -## Started on Fri May 20 10:46:58 2005 -## $Id: test_component.py,v 1.2 2005/09/18 20:24:07 dax Exp $ -## -## Copyright (C) 2005 -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -## - -import thread -import unittest -import dummy_server -import time -import traceback -from pyxmpp import xmlextra -from jmc.jabber.component import * -from jmc.utils.config import Config - -class TestStreamHandler(xmlextra.StreamHandler): - def __init__(self, expected_balises = []): - xmlextra.StreamHandler.__init__(self) - self.expected_balises = expected_balises - - def stream_start(self, doc): - pass - - def stream_end(self, doc): - pass - - def stanza(self, notused, node): - pass - -class MailComponent_TestCase_NoConnection(unittest.TestCase): - def setUp(self): - self.mail_component = MailComponent(Config("tests/jmc-test.xml")) - - def tearDown(self): - self.mail_component = None - - def test_get_reg_form(self): - reg_form = self.mail_component.get_reg_form() - reg_form2 = self.mail_component.get_reg_form() - self.assertTrue(reg_form is reg_form2) - -#TODO -class MailComponent_TestCase_Basic(unittest.TestCase): - def setUp(self): - self.handler = TestStreamHandler() - self.mail_component = MailComponent(Config("tests/jmc-test.xml")) - self.server = dummy_server.XMLDummyServer("localhost", 55555, None, self.handler) - thread.start_new_thread(self.server.serve, ()) - - def tearDown(self): - self.server = None - self.mail_component = None - os.remove("./registered.db") - - def test_run(self): - self.server.responses = ["", \ - ""] - self.server.queries = ["", \ - "[0-9abcdef]*", - ""] - self.mail_component.run(1) - self.failUnless(self.server.verify_queries()) - # TODO : more assertion - -class MailComponent_TestCase_NoReg(unittest.TestCase): - def setUp(self): - self.handler = TestStreamHandler() - self.mail_component = MailComponent(Config("tests/jmc-test.xml")) - self.server = dummy_server.XMLDummyServer("localhost", 55555, None, self.handler) - thread.start_new_thread(self.server.serve, ()) - - def tearDown(self): - self.server = None - self.mail_component = None - ## TODO : be storage independant - os.remove("./registered.db") - - def test_disco_get_items(self): - self.server.responses = ["", - "", - ""] - self.server.queries = ["" + \ - "", \ - "[0-9abcdef]*", - "", - ""] - self.mail_component.run(1) - self.failUnless(self.server.verify_queries()) - - def test_get_register(self): - self.server.responses = ["", - "", - ""] - self.server.queries = ["" + \ - "", \ - "[0-9abcdef]*", - "" + \ - "" + \ - "" + \ - "Jabber Mail connection registration" + \ - "Enter anything below" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "INBOX" + \ - "" + \ - "" + \ - "2" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "2" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "1" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "1" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "1" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "0" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "5" + \ - "" + \ - "" + \ - "" + \ - "", - ""] - self.mail_component.run(1) - self.failUnless(self.server.verify_queries()) - - - def test_disco_get_info(self): - self.server.responses = ["", - "", - ""] - self.server.queries = ["" + \ - "", - "[0-9abcdef]*", - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "", - ""] - self.mail_component.run(1) - self.failUnless(self.server.verify_queries()) - - def test_set_register(self): - self.server.responses = ["", \ - "" + \ - "" + \ - "" + \ - "" + \ - "" + \ - "test" + \ - "" + \ - "" + \ - "logintest" + \ - "" + \ - "" + \ - "passtest" + \ - "" + \ - "" + \ - "hosttest" + \ - "" + \ - "" + \ - "993" + \ - "" + \ - "" + \ - "imaps" + \ - "" + \ - "" + \ - "INBOX" + \ - "" + \ - "" + \ - "2" + \ - "" + \ - "" + \ - "2" + \ - "" + \ - "" + \ - "1" + \ - "" + \ - "" + \ - "1" + \ - "" + \ - "" + \ - "1" + \ - "" + \ - "" + \ - "0" + \ - "" + \ - "" + \ - "5" + \ - "" + \ - "" + \ - "", - lambda x: None, - ""] - self.server.queries = ["" + \ - "", \ - "[0-9abcdef]*", \ - "", - "", - "New imaps connection \\'test\\': Registered with username \\'logintest\\' and password \\'passtest\\' on \\'hosttest:993\\'", - "", - ""] - self.mail_component.run(1) - self.failUnless(self.server.verify_queries()) - -class MailComponent_TestCase_Reg(unittest.TestCase): - def setUp(self): - self.mail_component = MailComponent(Config("tests/jmc-test.xml")) - self.mail_component.set_register(iq = None) - - def test_get_reg_form_init(self): - pass - - def test_get_reg_form_init_2pass(self): - pass - - def test_disco_get_items(self): - pass - - def test_get_register(self): - pass - - def test_set_register_update(self): - pass - - def test_set_register_remove(self): - pass - - def test_presence_available_transport(self): - pass - - def test_presence_available_mailbox(self): - pass - - def test_presence_unavailable_transport(self): - pass - - def test_presence_unavailable_mailbox(self): - pass - - def test_presence_subscribe(self): - pass - - def test_presence_subscribed_transport(self): - pass - - def test_presence_subscribed_mailbox(self): - pass - - def test_presence_unsubscribe(self): - pass - - def test_presence_unsubscribed(self): - pass - - def test_check_mail(self): - pass - diff --git a/tests/test_mailconnection_factory.py b/tests/test_mailconnection_factory.py deleted file mode 100644 index 738374e..0000000 --- a/tests/test_mailconnection_factory.py +++ /dev/null @@ -1,204 +0,0 @@ -## -## test_mailconnection_factory.py -## Login : David Rousselie -## Started on Fri May 20 10:46:58 2005 -## $Id: test_mailconnection_factory.py,v 1.1 2005/07/11 20:39:31 dax Exp $ -## -## Copyright (C) 2005 -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -## - -import unittest -from jmc.email.mailconnection_factory import * -from jmc.email.mailconnection import * -import jmc.email.mailconnection as mailconnection - -class MailConnectionFactory_TestCase(unittest.TestCase): - def test_new_mail_connection_imap(self): - mc = get_new_mail_connection("imap") - self.assertEquals(mc, IMAPConnection()) - - def test_new_mail_connection_imaps(self): - mc = get_new_mail_connection("imaps") - self.assertEquals(mc, IMAPConnection(ssl = True)) - - def test_new_mail_connection_pop3(self): - mc = get_new_mail_connection("pop3") - self.assertEquals(mc, POP3Connection()) - - def test_new_mail_connection_pop3s(self): - mc = get_new_mail_connection("pop3s") - self.assertEquals(mc, POP3Connection(ssl = True)) - - def test_new_mail_connection_unknown(self): - self.assertRaises(Exception, get_new_mail_connection, "unknown") - - def test_str_to_mail_connection_imap_v01_v02(self): - mc = str_to_mail_connection("imap#login#passwd#host#193#False#INBOX") - self.assertEquals(mc.get_type(), "imap") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, "passwd") - self.assertEquals(mc.store_password, True) - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 193) - self.assertEquals(mc.mailbox, "INBOX") - self.assertEquals(mc.chat_action, mailconnection.DIGEST) - self.assertEquals(mc.online_action, mailconnection.DIGEST) - self.assertEquals(mc.away_action, mailconnection.DIGEST) - self.assertEquals(mc.xa_action, mailconnection.DIGEST) - self.assertEquals(mc.dnd_action, mailconnection.DIGEST) - self.assertEquals(mc.offline_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.interval, 5) - self.assertEquals(mc.live_email_only, False) - - def test_str_to_mail_connection_imap_v01_v02_retrieve(self): - mc = str_to_mail_connection("imap#login#passwd#host#193#True#INBOX") - self.assertEquals(mc.get_type(), "imap") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, "passwd") - self.assertEquals(mc.store_password, True) - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 193) - self.assertEquals(mc.mailbox, "INBOX") - self.assertEquals(mc.chat_action, mailconnection.RETRIEVE) - self.assertEquals(mc.online_action, mailconnection.RETRIEVE) - self.assertEquals(mc.away_action, mailconnection.RETRIEVE) - self.assertEquals(mc.xa_action, mailconnection.RETRIEVE) - self.assertEquals(mc.dnd_action, mailconnection.RETRIEVE) - self.assertEquals(mc.offline_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.interval, 5) - self.assertEquals(mc.live_email_only, False) - - def test_str_to_mail_connection_pop3_v01_v02(self): - mc = str_to_mail_connection("pop3#login#passwd#host#110#False") - self.assertEquals(mc.get_type(), "pop3") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, "passwd") - self.assertEquals(mc.store_password, True) - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 110) - self.assertEquals(mc.chat_action, mailconnection.DIGEST) - self.assertEquals(mc.online_action, mailconnection.DIGEST) - self.assertEquals(mc.away_action, mailconnection.DIGEST) - self.assertEquals(mc.xa_action, mailconnection.DIGEST) - self.assertEquals(mc.dnd_action, mailconnection.DIGEST) - self.assertEquals(mc.offline_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.interval, 5) - self.assertEquals(mc.live_email_only, False) - - def test_str_to_mail_connection_pop3_v01_v02_retrieve(self): - mc = str_to_mail_connection("pop3#login#passwd#host#110#True") - self.assertEquals(mc.get_type(), "pop3") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, "passwd") - self.assertEquals(mc.store_password, True) - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 110) - self.assertEquals(mc.chat_action, mailconnection.RETRIEVE) - self.assertEquals(mc.online_action, mailconnection.RETRIEVE) - self.assertEquals(mc.away_action, mailconnection.RETRIEVE) - self.assertEquals(mc.xa_action, mailconnection.RETRIEVE) - self.assertEquals(mc.dnd_action, mailconnection.RETRIEVE) - self.assertEquals(mc.offline_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.interval, 5) - self.assertEquals(mc.live_email_only, False) - - def test_str_to_mail_connection_imap(self): - mc = str_to_mail_connection("imap#login#passwd#host#193#0#0#0#1#1#2#4#True#INBOX") - self.assertEquals(mc.get_type(), "imap") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, "passwd") - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 193) - self.assertEquals(mc.mailbox, "INBOX") - self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.online_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.away_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.xa_action, mailconnection.DIGEST) - self.assertEquals(mc.dnd_action, mailconnection.DIGEST) - self.assertEquals(mc.offline_action, mailconnection.RETRIEVE) - self.assertEquals(mc.interval, 4) - self.assertEquals(mc.live_email_only, True) - - def test_str_to_mail_connection_no_password(self): - mc = str_to_mail_connection("imap#login#/\\#host#193#0#0#0#1#1#2#4#False#INBOX") - self.assertEquals(mc.get_type(), "imap") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, None) - self.assertEquals(mc.store_password, False) - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 193) - self.assertEquals(mc.mailbox, "INBOX") - self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.online_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.away_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.xa_action, mailconnection.DIGEST) - self.assertEquals(mc.dnd_action, mailconnection.DIGEST) - self.assertEquals(mc.offline_action, mailconnection.RETRIEVE) - self.assertEquals(mc.interval, 4) - self.assertEquals(mc.live_email_only, False) - - def test_str_to_mail_connection_imaps(self): - mc = str_to_mail_connection("imaps#login#passwd#host#993#0#0#0#1#1#2#4#True#INBOX.SubDir") - self.assertEquals(mc.get_type(), "imaps") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, "passwd") - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 993) - self.assertEquals(mc.mailbox, "INBOX.SubDir") - self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.online_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.away_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.xa_action, mailconnection.DIGEST) - self.assertEquals(mc.dnd_action, mailconnection.DIGEST) - self.assertEquals(mc.offline_action, mailconnection.RETRIEVE) - self.assertEquals(mc.interval, 4) - self.assertEquals(mc.live_email_only, True) - - def test_str_to_mail_connection_pop3(self): - mc = str_to_mail_connection("pop3#login#passwd#host#110#0#0#0#1#1#2#4#False") - self.assertEquals(mc.get_type(), "pop3") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, "passwd") - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 110) - self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.online_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.away_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.xa_action, mailconnection.DIGEST) - self.assertEquals(mc.dnd_action, mailconnection.DIGEST) - self.assertEquals(mc.offline_action, mailconnection.RETRIEVE) - self.assertEquals(mc.interval, 4) - self.assertEquals(mc.live_email_only, False) - - def test_str_to_mail_connection_pop3s(self): - mc = str_to_mail_connection("pop3s#login#passwd#host#995#0#0#0#1#1#2#4#True") - self.assertEquals(mc.get_type(), "pop3s") - self.assertEquals(mc.login, "login") - self.assertEquals(mc.password, "passwd") - self.assertEquals(mc.host, "host") - self.assertEquals(mc.port, 995) - self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.online_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.away_action, mailconnection.DO_NOTHING) - self.assertEquals(mc.xa_action, mailconnection.DIGEST) - self.assertEquals(mc.dnd_action, mailconnection.DIGEST) - self.assertEquals(mc.offline_action, mailconnection.RETRIEVE) - self.assertEquals(mc.interval, 4) - self.assertEquals(mc.live_email_only, True) - - def test_str_to_mail_connection_unknown(self): - self.assertRaises(Exception, str_to_mail_connection, ("unknown#login#passwd#host#995#0#0#0#1#1#2#4#True")) - diff --git a/tests/test_storage.py b/tests/test_storage.py deleted file mode 100644 index f3ff233..0000000 --- a/tests/test_storage.py +++ /dev/null @@ -1,224 +0,0 @@ -## -## test_storage.py -## Login : David Rousselie -## Started on Fri May 20 10:46:58 2005 dax -## $Id: test_component.py,v 1.1 2005/07/11 20:39:31 dax Exp $ -## -## Copyright (C) 2005 adro8400 -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -## - -import os -import unittest -import dummy_server -from jmc.utils.storage import * -from jmc.email.mailconnection import * -import jmc.email.mailconnection as mailconnection - -class Storage_TestCase(unittest.TestCase): - def test_init(self): - spool_dir = "./spool/test" - self._storage = Storage(spool_dir = spool_dir) - self.assertTrue(os.access(spool_dir, os.F_OK)) - self.assertEquals(self._storage.spool_dir, spool_dir) - os.removedirs(spool_dir) - -class DBMStorage_TestCase(unittest.TestCase): - def setUp(self): - spool_dir = "./spool/test" - self._storage = DBMStorage(nb_pk_fields = 2, spool_dir = spool_dir) - self._account1 = IMAPConnection(login = "login1", - password = "password1", - host = "host1", - port = 993, - ssl = True, - mailbox = "INBOX.box1") - self._account1.chat_action = mailconnection.DO_NOTHING - self._account1.online_action = mailconnection.DO_NOTHING - self._account1.away_action = mailconnection.DO_NOTHING - self._account1.xa_action = mailconnection.DO_NOTHING - self._account1.dnd_action = mailconnection.DO_NOTHING - self._account1.offline_action = mailconnection.DO_NOTHING - self._account1.interval = 4 - self._account2 = POP3Connection(login = "login2", - password = "password2", - host = "host2", - port = 1110, - ssl = False) - self._account2.chat_action = mailconnection.DO_NOTHING - self._account2.online_action = mailconnection.DO_NOTHING - self._account2.away_action = mailconnection.DO_NOTHING - self._account2.xa_action = mailconnection.DO_NOTHING - self._account2.dnd_action = mailconnection.DO_NOTHING - self._account2.offline_action = mailconnection.DO_NOTHING - self._account2.interval = 4 - self._account2.store_password = False - self._account2.live_email_only = True - - def tearDown(self): - db_file = self._storage.file - self._storage = None - os.remove(db_file) - - def test_set_get(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - self.assertEquals(self._storage[("test@localhost", "account1")], - self._account1) - self.assertEquals(self._storage[("test@localhost", "account2")], - self._account2) - - def test_set_sync_get(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - loaded_storage = DBMStorage(nb_pk_fields = 2, spool_dir = "./spool/test") - self.assertEquals(loaded_storage[("test@localhost", "account1")], - self._account1) - self.assertEquals(loaded_storage[("test@localhost", "account2")], - self._account2) - - def test_set_del_get(self): - self._storage[("test@localhost", "account2")] = self._account2 - del self._storage[("test@localhost", "account2")] - try: - self._storage[("test@localhost", "account2")] - except KeyError: - return - self.fail("KeyError was expected") - - def test_haskey(self): - self._storage[("test@localhost", "account2")] = self._account2 - self.assertTrue(self._storage.has_key((u"test@localhost", u"account2"))) - - def test_partial_haskey(self): - self._storage[("test@localhost", "account2")] = self._account2 - self.assertTrue(self._storage.has_key((u"test@localhost",))) - - def test_get_filtered(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - result = self._storage[("test@localhost",)] - self.assertEquals(type(result), list) - self.assertEquals(len(result), 2) - self.assertEquals(result[1], self._account1) - self.assertEquals(result[0], self._account2) - - def test_get_filtered2(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - result = self._storage[("account1",)] - self.assertEquals(type(result), list) - self.assertEquals(len(result), 1) - self.assertEquals(result[0], self._account1) - - def test_keys(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - result = self._storage.keys() - self.assertEquals(type(result), list) - self.assertEquals(len(result), 2) - self.assertEquals(type(result[1]), tuple) - self.assertEquals(len(result[1]), 2) - self.assertEquals(result[1][0], "test@localhost") - self.assertEquals(result[1][1], "account1") - self.assertEquals(type(result[0]), tuple) - self.assertEquals(len(result[0]), 2) - self.assertEquals(result[0][0], "test@localhost") - self.assertEquals(result[0][1], "account2") - - def test_keys_filtered(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - result = self._storage.keys(()) - self.assertEquals(type(result), list) - self.assertEquals(len(result), 1) - self.assertEquals(result[0], "test@localhost") - - def test_keys_filtered2(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - result = self._storage.keys(("test@localhost",)) - self.assertEquals(type(result), list) - self.assertEquals(len(result), 2) - self.assertEquals(result[0], "account2") - self.assertEquals(result[1], "account1") - - def test_del_sync_get(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - del self._storage[("test@localhost", "account2")] - loaded_storage = DBMStorage(nb_pk_fields = 2, spool_dir = "./spool/test") - self.assertEquals(len(loaded_storage.keys()), - 1) - self.assertEquals(loaded_storage[("test@localhost", "account1")], - self._account1) - -class SQLiteStorage_TestCase(DBMStorage_TestCase): - def setUp(self): - spool_dir = "./spool/test" - self._storage = SQLiteStorage(nb_pk_fields = 2, spool_dir = spool_dir) - self._account1 = IMAPConnection(login = "login1", - password = "password1", - host = "host1", - port = 993, - ssl = True, - mailbox = "INBOX.box1") - self._account1.chat_action = mailconnection.DIGEST - self._account1.online_action = mailconnection.DIGEST - self._account1.away_action = mailconnection.DO_NOTHING - self._account1.xa_action = mailconnection.DO_NOTHING - self._account1.dnd_action = mailconnection.DO_NOTHING - self._account1.offline_action = mailconnection.DO_NOTHING - self._account1.interval = 4 - self._account2 = POP3Connection(login = "login2", - password = "password2", - host = "host2", - port = 1993, - ssl = False) - self._account2.chat_action = mailconnection.DO_NOTHING - self._account2.online_action = mailconnection.DO_NOTHING - self._account2.away_action = mailconnection.DO_NOTHING - self._account2.xa_action = mailconnection.DO_NOTHING - self._account2.dnd_action = mailconnection.DO_NOTHING - self._account2.offline_action = mailconnection.DO_NOTHING - self._account2.interval = 4 - self._account2.store_password = False - self._account2.live_email_only = True - -# def tearDown(self): -# os.remove(self._storage.file) -# self._storage = None - - - def test_set_sync_get(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - self._account2.password = None - loaded_storage = SQLiteStorage(nb_pk_fields = 2, spool_dir = "./spool/test") - self.assertEquals(loaded_storage[("test@localhost", "account1")], - self._account1) - self.assertEquals(loaded_storage[("test@localhost", "account2")], - self._account2) - - def test_del_sync_get(self): - self._storage[("test@localhost", "account1")] = self._account1 - self._storage[("test@localhost", "account2")] = self._account2 - del self._storage[("test@localhost", "account2")] - loaded_storage = SQLiteStorage(nb_pk_fields = 2, spool_dir = "./spool/test") - self.assertEquals(len(loaded_storage.keys()), - 1) - self.assertEquals(loaded_storage[("test@localhost", "account1")], - self._account1) - diff --git a/tests/test_x.py b/tests/test_x.py deleted file mode 100644 index 4e45f9e..0000000 --- a/tests/test_x.py +++ /dev/null @@ -1,72 +0,0 @@ -## -## test_x.py -## Login : David Rousselie -## Started on Fri May 20 10:46:58 2005 -## $Id: test_x.py,v 1.1 2005/07/11 20:39:31 dax Exp $ -## -## Copyright (C) 2005 -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -## - -import unittest -from jmc.jabber.x import * - -class X_TestCase(unittest.TestCase): - def setUp(self): - self.mail_component = MailComponent() - - def test_get_form(self): - self.reg_form.add_field(type = "text-single", \ - label = "Connection name", \ - var = "name") - - self.reg_form.add_field(type = "text-single", \ - label = "Login", \ - var = "login") - - self.reg_form.add_field(type = "text-private", \ - label = "password", \ - var = "password") - - self.reg_form.add_field(type = "text-single", \ - label = "Host", \ - var = "host") - - self.reg_form.add_field(type = "text-single", \ - label = "Port", \ - var = "port") - - field = self.reg_form.add_field(type = "list-single", \ - label = "Mailbox type", \ - var = "type") - field.add_option(label = "POP3", \ - value = "pop3") - field.add_option(label = "POP3S", \ - value = "pop3s") - field.add_option(label = "IMAP", \ - value = "imap") - field.add_option(label = "IMAPS", \ - value = "imaps") - - self.reg_form.add_field(type = "text-single", \ - label = "Mailbox (IMAP)", \ - var = "mailbox", \ - value = "INBOX") - - self.reg_form.add_field(type = "boolean", \ - label = "Retrieve mail", \ - var = "retrieve") - pass - diff --git a/tests/utils.py b/tests/utils.py deleted file mode 100644 index 0c5a619..0000000 --- a/tests/utils.py +++ /dev/null @@ -1,95 +0,0 @@ -## -## utils.py -## Login : David Rousselie -## Started on Mon Oct 24 21:44:43 2005 dax -## $Id$ -## -## Copyright (C) 2005 dax -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -## - -import xml.dom.minidom -import re - -def xmldiff(node1, node2): - if node1.nodeType == node1.TEXT_NODE: - if not node2.nodeType == node2.TEXT_NODE \ - or re.compile(node2.data + "$").match(node1.data) is None: - raise Exception("data in text node " + node1.data + " does not match " + node2.data) - elif node1.nodeType == node1.DOCUMENT_NODE: - if not node2.nodeType == node2.DOCUMENT_NODE: - raise Exception("node1 is Document but not node2 (" + node2.nodeType + ")") - elif node1.tagName != node2.tagName: - raise Exception("Different tag name : " + node1.tagName + " != " + node2.tagName) - else: - for attr in node1._get_attributes().keys(): - if not node2.hasAttribute(attr) \ - or node1.getAttribute(attr) != node2.getAttribute(attr): - raise Exception("(" + node1.tagName + ") Different attributes : " + node1.getAttribute(attr) + " != " + node2.getAttribute(attr)) - if len(node1.childNodes) != len(node2.childNodes): - raise Exception("(" + node1.tagName + ") Different children number : " + str(len(node1.childNodes)) + " != " + str(len(node2.childNodes))) - for i in range(len(node1.childNodes)): - xmldiff(node1.childNodes[i], node2.childNodes[i]) - -# def xmldiff(events1, events2): -# for (event1, node1) in events1: -# (event2, node2) = events2.next() -# print event1 + " " + str(node1) -# if not (event1 == event2) or not xml_diff_nodes(node1, node2): -# return False -# return True - -if __name__ == "__main__": - document1 = """\ - - Demo slideshow - Slide title - This is a demo - Of a program for processing slides - - - Another demo slide - It is important - To have more than - one slide - - - """ - - document2 = """\ - - Demo slideshow - Slide title - This is a demo - Of a program for processing slides - - - Another demo slide - It is important - To have more than - one slide - - - """ - - dom1 = xml.dom.minidom.parseString(document1) - dom2 = xml.dom.minidom.parseString(document2) - - try: - xmldiff(dom1, dom2) - except Exception, msg: - print msg - -