diff --git a/coverage.py b/coverage.py
index abd95da..66e55e0 100755
--- a/coverage.py
+++ b/coverage.py
@@ -6,6 +6,8 @@
# COVERAGE.PY -- COVERAGE TESTING
#
# Gareth Rees, Ravenbrook Limited, 2001-12-04
+# Ned Batchelder, 2004-12-12
+# http://nedbatchelder.com/code/modules/coverage.html
#
#
# 1. INTRODUCTION
@@ -20,34 +22,49 @@
# interface and limitations. See [GDR 2001-12-04b] for requirements and
# design.
-"""Usage:
+r"""Usage:
-coverage.py -x MODULE.py [ARG1 ARG2 ...]
+coverage.py -x [-p] MODULE.py [ARG1 ARG2 ...]
Execute module, passing the given command-line arguments, collecting
- coverage data.
+ coverage data. With the -p option, write to a temporary file containing
+ the machine name and process ID.
coverage.py -e
Erase collected coverage data.
-coverage.py -r [-m] FILE1 FILE2 ...
+coverage.py -c
+ Collect data from multiple coverage files (as created by -p option above)
+ and store it into a single file representing the union of the coverage.
+
+coverage.py -r [-m] [-o dir1,dir2,...] FILE1 FILE2 ...
Report on the statement coverage for the given files. With the -m
option, show line numbers of the statements that weren't executed.
-coverage.py -a [-d dir] FILE1 FILE2 ...
+coverage.py -a [-d dir] [-o dir1,dir2,...] FILE1 FILE2 ...
Make annotated copies of the given files, marking statements that
are executed with > and statements that are missed with !. With
the -d option, make the copies in that directory. Without the -d
option, make each copy in the same directory as the original.
+-o dir,dir2,...
+ Omit reporting or annotating files when their filename path starts with
+ a directory listed in the omit list.
+ e.g. python coverage.py -i -r -o c:\python23,lib\enthought\traits
+
Coverage data is saved in the file .coverage by default. Set the
COVERAGE_FILE environment variable to save it somewhere else."""
+__version__ = "2.6.20060823" # see detailed history at the end of this file.
+
+import compiler
+import compiler.visitor
import os
import re
import string
import sys
+import threading
import types
-
+from socket import gethostname
# 2. IMPLEMENTATION
#
@@ -69,37 +86,175 @@ import types
# In the bottleneck of this application it's appropriate to abbreviate
# names to increase speed.
-# A dictionary with an entry for (Python source file name, line number
-# in that file) if that line has been executed.
-c = {}
+class StatementFindingAstVisitor(compiler.visitor.ASTVisitor):
+ def __init__(self, statements, excluded, suite_spots):
+ compiler.visitor.ASTVisitor.__init__(self)
+ self.statements = statements
+ self.excluded = excluded
+ self.suite_spots = suite_spots
+ self.excluding_suite = 0
+
+ def doRecursive(self, node):
+ self.recordNodeLine(node)
+ for n in node.getChildNodes():
+ self.dispatch(n)
-# t(f, x, y). This method is passed to sys.settrace as a trace
-# function. See [van Rossum 2001-07-20b, 9.2] for an explanation of
-# sys.settrace and the arguments and return value of the trace function.
-# See [van Rossum 2001-07-20a, 3.2] for a description of frame and code
-# objects.
+ visitStmt = visitModule = doRecursive
+
+ def doCode(self, node):
+ if hasattr(node, 'decorators') and node.decorators:
+ self.dispatch(node.decorators)
+ self.recordAndDispatch(node.code)
+ else:
+ self.doSuite(node, node.code)
+
+ visitFunction = visitClass = doCode
-def t(f, x, y):
- c[(f.f_code.co_filename, f.f_lineno)] = 1
- return t
+ def getFirstLine(self, node):
+ # Find the first line in the tree node.
+ lineno = node.lineno
+ for n in node.getChildNodes():
+ f = self.getFirstLine(n)
+ if lineno and f:
+ lineno = min(lineno, f)
+ else:
+ lineno = lineno or f
+ return lineno
+
+ def getLastLine(self, node):
+ # Find the first line in the tree node.
+ lineno = node.lineno
+ for n in node.getChildNodes():
+ lineno = max(lineno, self.getLastLine(n))
+ return lineno
+
+ def doStatement(self, node):
+ self.recordLine(self.getFirstLine(node))
+
+ visitAssert = visitAssign = visitAssTuple = visitDiscard = visitPrint = \
+ visitPrintnl = visitRaise = visitSubscript = visitDecorators = \
+ doStatement
+
+ def recordNodeLine(self, node):
+ return self.recordLine(node.lineno)
+
+ def recordLine(self, lineno):
+ # Returns a bool, whether the line is included or excluded.
+ if lineno:
+ # Multi-line tests introducing suites have to get charged to their
+ # keyword.
+ if lineno in self.suite_spots:
+ lineno = self.suite_spots[lineno][0]
+ # If we're inside an exluded suite, record that this line was
+ # excluded.
+ if self.excluding_suite:
+ self.excluded[lineno] = 1
+ return 0
+ # If this line is excluded, or suite_spots maps this line to
+ # another line that is exlcuded, then we're excluded.
+ elif self.excluded.has_key(lineno) or \
+ self.suite_spots.has_key(lineno) and \
+ self.excluded.has_key(self.suite_spots[lineno][1]):
+ return 0
+ # Otherwise, this is an executable line.
+ else:
+ self.statements[lineno] = 1
+ return 1
+ return 0
+
+ default = recordNodeLine
+
+ def recordAndDispatch(self, node):
+ self.recordNodeLine(node)
+ self.dispatch(node)
+
+ def doSuite(self, intro, body, exclude=0):
+ exsuite = self.excluding_suite
+ if exclude or (intro and not self.recordNodeLine(intro)):
+ self.excluding_suite = 1
+ self.recordAndDispatch(body)
+ self.excluding_suite = exsuite
+
+ def doPlainWordSuite(self, prevsuite, suite):
+ # Finding the exclude lines for else's is tricky, because they aren't
+ # present in the compiler parse tree. Look at the previous suite,
+ # and find its last line. If any line between there and the else's
+ # first line are excluded, then we exclude the else.
+ lastprev = self.getLastLine(prevsuite)
+ firstelse = self.getFirstLine(suite)
+ for l in range(lastprev+1, firstelse):
+ if self.suite_spots.has_key(l):
+ self.doSuite(None, suite, exclude=self.excluded.has_key(l))
+ break
+ else:
+ self.doSuite(None, suite)
+
+ def doElse(self, prevsuite, node):
+ if node.else_:
+ self.doPlainWordSuite(prevsuite, node.else_)
+
+ def visitFor(self, node):
+ self.doSuite(node, node.body)
+ self.doElse(node.body, node)
+
+ def visitIf(self, node):
+ # The first test has to be handled separately from the rest.
+ # The first test is credited to the line with the "if", but the others
+ # are credited to the line with the test for the elif.
+ self.doSuite(node, node.tests[0][1])
+ for t, n in node.tests[1:]:
+ self.doSuite(t, n)
+ self.doElse(node.tests[-1][1], node)
+
+ def visitWhile(self, node):
+ self.doSuite(node, node.body)
+ self.doElse(node.body, node)
+
+ def visitTryExcept(self, node):
+ self.doSuite(node, node.body)
+ for i in range(len(node.handlers)):
+ a, b, h = node.handlers[i]
+ if not a:
+ # It's a plain "except:". Find the previous suite.
+ if i > 0:
+ prev = node.handlers[i-1][2]
+ else:
+ prev = node.body
+ self.doPlainWordSuite(prev, h)
+ else:
+ self.doSuite(a, h)
+ self.doElse(node.handlers[-1][2], node)
+
+ def visitTryFinally(self, node):
+ self.doSuite(node, node.body)
+ self.doPlainWordSuite(node.body, node.final)
+
+ def visitGlobal(self, node):
+ # "global" statements don't execute like others (they don't call the
+ # trace function), so don't record their line numbers.
+ pass
the_coverage = None
-class coverage:
- error = "coverage error"
+class CoverageException(Exception): pass
+class coverage:
# Name of the cache file (unless environment variable is set).
cache_default = ".coverage"
# Environment variable naming the cache file.
cache_env = "COVERAGE_FILE"
+ # A dictionary with an entry for (Python source file name, line number
+ # in that file) if that line has been executed.
+ c = {}
+
# A map from canonical Python source file name to a dictionary in
# which there's an entry for each line number that has been
# executed.
cexecuted = {}
- # Cache of results of calling the analysis() method, so that you can
+ # Cache of results of calling the analysis2() method, so that you can
# specify both -r and -a without doing double work.
analysis_cache = {}
@@ -110,11 +265,28 @@ class coverage:
def __init__(self):
global the_coverage
if the_coverage:
- raise self.error, "Only one coverage object allowed."
- self.cache = os.environ.get(self.cache_env, self.cache_default)
- self.restore()
- self.analysis_cache = {}
+ raise CoverageException, "Only one coverage object allowed."
+ self.usecache = 1
+ self.cache = None
+ self.exclude_re = ''
+ self.nesting = 0
+ self.cstack = []
+ self.xstack = []
+ self.relative_dir = os.path.normcase(os.path.abspath(os.curdir)+os.path.sep)
+ # t(f, x, y). This method is passed to sys.settrace as a trace function.
+ # See [van Rossum 2001-07-20b, 9.2] for an explanation of sys.settrace and
+ # the arguments and return value of the trace function.
+ # See [van Rossum 2001-07-20a, 3.2] for a description of frame and code
+ # objects.
+
+ def t(self, f, w, a): #pragma: no cover
+ if w == 'line':
+ self.c[(f.f_code.co_filename, f.f_lineno)] = 1
+ for c in self.cstack:
+ c[(f.f_code.co_filename, f.f_lineno)] = 1
+ return self.t
+
def help(self, error=None):
if error:
print error
@@ -122,23 +294,26 @@ class coverage:
print __doc__
sys.exit(1)
- def command_line(self):
+ def command_line(self, argv, help=None):
import getopt
+ help = help or self.help
settings = {}
optmap = {
'-a': 'annotate',
+ '-c': 'collect',
'-d:': 'directory=',
'-e': 'erase',
'-h': 'help',
'-i': 'ignore-errors',
'-m': 'show-missing',
+ '-p': 'parallel-mode',
'-r': 'report',
'-x': 'execute',
+ '-o:': 'omit=',
}
short_opts = string.join(map(lambda o: o[1:], optmap.keys()), '')
long_opts = optmap.values()
- options, args = getopt.getopt(sys.argv[1:], short_opts,
- long_opts)
+ options, args = getopt.getopt(argv, short_opts, long_opts)
for o, a in options:
if optmap.has_key(o):
settings[optmap[o]] = 1
@@ -147,86 +322,167 @@ class coverage:
elif o[2:] in long_opts:
settings[o[2:]] = 1
elif o[2:] + '=' in long_opts:
- settings[o[2:]] = a
- else:
- self.help("Unknown option: '%s'." % o)
+ settings[o[2:]+'='] = a
+ else: #pragma: no cover
+ pass # Can't get here, because getopt won't return anything unknown.
+
if settings.get('help'):
- self.help()
+ help()
+
for i in ['erase', 'execute']:
- for j in ['annotate', 'report']:
+ for j in ['annotate', 'report', 'collect']:
if settings.get(i) and settings.get(j):
- self.help("You can't specify the '%s' and '%s' "
+ help("You can't specify the '%s' and '%s' "
"options at the same time." % (i, j))
+
args_needed = (settings.get('execute')
or settings.get('annotate')
or settings.get('report'))
- action = settings.get('erase') or args_needed
+ action = (settings.get('erase')
+ or settings.get('collect')
+ or args_needed)
if not action:
- self.help("You must specify at least one of -e, -x, -r, "
- "or -a.")
+ help("You must specify at least one of -e, -x, -c, -r, or -a.")
if not args_needed and args:
- self.help("Unexpected arguments %s." % args)
+ help("Unexpected arguments: %s" % " ".join(args))
+
+ self.get_ready(settings.get('parallel-mode'))
+ self.exclude('#pragma[: ]+[nN][oO] [cC][oO][vV][eE][rR]')
+
if settings.get('erase'):
self.erase()
if settings.get('execute'):
if not args:
- self.help("Nothing to do.")
+ help("Nothing to do.")
sys.argv = args
self.start()
import __main__
sys.path[0] = os.path.dirname(sys.argv[0])
execfile(sys.argv[0], __main__.__dict__)
+ if settings.get('collect'):
+ self.collect()
if not args:
args = self.cexecuted.keys()
+
ignore_errors = settings.get('ignore-errors')
show_missing = settings.get('show-missing')
directory = settings.get('directory=')
+
+ omit = settings.get('omit=')
+ if omit is not None:
+ omit = omit.split(',')
+ else:
+ omit = []
+
if settings.get('report'):
- self.report(args, show_missing, ignore_errors)
+ self.report(args, show_missing, ignore_errors, omit_prefixes=omit)
if settings.get('annotate'):
- self.annotate(args, directory, ignore_errors)
-
- def start(self):
- sys.settrace(t)
+ self.annotate(args, directory, ignore_errors, omit_prefixes=omit)
+ def use_cache(self, usecache, cache_file=None):
+ self.usecache = usecache
+ if cache_file and not self.cache:
+ self.cache_default = cache_file
+
+ def get_ready(self, parallel_mode=False):
+ if self.usecache and not self.cache:
+ self.cache = os.environ.get(self.cache_env, self.cache_default)
+ if parallel_mode:
+ self.cache += "." + gethostname() + "." + str(os.getpid())
+ self.restore()
+ self.analysis_cache = {}
+
+ def start(self, parallel_mode=False):
+ self.get_ready(parallel_mode)
+ if self.nesting == 0: #pragma: no cover
+ sys.settrace(self.t)
+ if hasattr(threading, 'settrace'):
+ threading.settrace(self.t)
+ self.nesting += 1
+
def stop(self):
- sys.settrace(None)
+ self.nesting -= 1
+ if self.nesting == 0: #pragma: no cover
+ sys.settrace(None)
+ if hasattr(threading, 'settrace'):
+ threading.settrace(None)
def erase(self):
- global c
- c = {}
+ self.c = {}
self.analysis_cache = {}
self.cexecuted = {}
- if os.path.exists(self.cache):
+ if self.cache and os.path.exists(self.cache):
os.remove(self.cache)
+ self.exclude_re = ""
+
+ def exclude(self, re):
+ if self.exclude_re:
+ self.exclude_re += "|"
+ self.exclude_re += "(" + re + ")"
+
+ def begin_recursive(self):
+ self.cstack.append(self.c)
+ self.xstack.append(self.exclude_re)
+
+ def end_recursive(self):
+ self.c = self.cstack.pop()
+ self.exclude_re = self.xstack.pop()
# save(). Save coverage data to the coverage cache.
def save(self):
- self.canonicalize_filenames()
- cache = open(self.cache, 'wb')
- import marshal
- marshal.dump(self.cexecuted, cache)
- cache.close()
+ if self.usecache and self.cache:
+ self.canonicalize_filenames()
+ cache = open(self.cache, 'wb')
+ import marshal
+ marshal.dump(self.cexecuted, cache)
+ cache.close()
- # restore(). Restore coverage data from the coverage cache (if it
- # exists).
+ # restore(). Restore coverage data from the coverage cache (if it exists).
def restore(self):
- global c
- c = {}
+ self.c = {}
self.cexecuted = {}
- if not os.path.exists(self.cache):
- return
+ assert self.usecache
+ if os.path.exists(self.cache):
+ self.cexecuted = self.restore_file(self.cache)
+
+ def restore_file(self, file_name):
try:
- cache = open(self.cache, 'rb')
+ cache = open(file_name, 'rb')
import marshal
cexecuted = marshal.load(cache)
cache.close()
if isinstance(cexecuted, types.DictType):
- self.cexecuted = cexecuted
+ return cexecuted
+ else:
+ return {}
except:
- pass
+ return {}
+
+ # collect(). Collect data in multiple files produced by parallel mode
+
+ def collect(self):
+ cache_dir, local = os.path.split(self.cache)
+ for file in os.listdir(cache_dir):
+ if not file.startswith(local):
+ continue
+
+ full_path = os.path.join(cache_dir, file)
+ cexecuted = self.restore_file(full_path)
+ self.merge_data(cexecuted)
+
+ def merge_data(self, new_data):
+ for file_name, file_data in new_data.items():
+ if self.cexecuted.has_key(file_name):
+ self.merge_file_data(self.cexecuted[file_name], file_data)
+ else:
+ self.cexecuted[file_name] = file_data
+
+ def merge_file_data(self, cache_data, new_data):
+ for line_number in new_data.keys():
+ if not cache_data.has_key(line_number):
+ cache_data[line_number] = new_data[line_number]
# canonical_filename(filename). Return a canonical filename for the
# file (that is, an absolute path with no redundant components and
@@ -247,25 +503,23 @@ class coverage:
self.canonical_filename_cache[filename] = cf
return self.canonical_filename_cache[filename]
- # canonicalize_filenames(). Copy results from "executed" to
- # "cexecuted", canonicalizing filenames on the way. Clear the
- # "executed" map.
+ # canonicalize_filenames(). Copy results from "c" to "cexecuted",
+ # canonicalizing filenames on the way. Clear the "c" map.
def canonicalize_filenames(self):
- global c
- for filename, lineno in c.keys():
+ for filename, lineno in self.c.keys():
f = self.canonical_filename(filename)
if not self.cexecuted.has_key(f):
self.cexecuted[f] = {}
self.cexecuted[f][lineno] = 1
- c = {}
+ self.c = {}
# morf_filename(morf). Return the filename for a module or file.
def morf_filename(self, morf):
if isinstance(morf, types.ModuleType):
if not hasattr(morf, '__file__'):
- raise self.error, "Module has no __file__ attribute."
+ raise CoverageException, "Module has no __file__ attribute."
file = morf.__file__
else:
file = morf
@@ -273,9 +527,9 @@ class coverage:
# analyze_morf(morf). Analyze the module or filename passed as
# the argument. If the source code can't be found, raise an error.
- # Otherwise, return a pair of (1) the canonical filename of the
- # source code for the module, and (2) a list of lines of statements
- # in the source code.
+ # Otherwise, return a tuple of (1) the canonical filename of the
+ # source code for the module, (2) a list of lines of statements
+ # in the source code, and (3) a list of lines of excluded statements.
def analyze_morf(self, morf):
if self.analysis_cache.has_key(morf):
@@ -284,57 +538,82 @@ class coverage:
ext = os.path.splitext(filename)[1]
if ext == '.pyc':
if not os.path.exists(filename[0:-1]):
- raise self.error, ("No source for compiled code '%s'."
+ raise CoverageException, ("No source for compiled code '%s'."
% filename)
filename = filename[0:-1]
elif ext != '.py':
- raise self.error, "File '%s' not Python source." % filename
+ raise CoverageException, "File '%s' not Python source." % filename
source = open(filename, 'r')
- import parser
- tree = parser.suite(source.read()).totuple(1)
+ lines, excluded_lines = self.find_executable_statements(
+ source.read(), exclude=self.exclude_re
+ )
source.close()
- statements = {}
- self.find_statements(tree, statements)
- lines = statements.keys()
- lines.sort()
- result = filename, lines
+ result = filename, lines, excluded_lines
self.analysis_cache[morf] = result
return result
- # find_statements(tree, dict). Find each statement in the parse
- # tree and record the line on which the statement starts in the
- # dictionary (by assigning it to 1).
- #
- # It works by walking the whole tree depth-first. Every time it
- # comes across a statement (symbol.stmt -- this includes compound
- # statements like 'if' and 'while') it calls find_statement, which
- # descends the tree below the statement to find the first terminal
- # token in that statement and record the lines on which that token
- # was found.
- #
- # This algorithm may find some lines several times (because of the
- # grammar production statement -> compound statement -> statement),
- # but that doesn't matter because we record lines as the keys of the
- # dictionary.
- #
- # See also [GDR 2001-12-04b, 3.2].
-
- def find_statements(self, tree, dict):
+ def get_suite_spots(self, tree, spots):
import symbol, token
- if token.ISNONTERMINAL(tree[0]):
- for t in tree[1:]:
- self.find_statements(t, dict)
- if tree[0] == symbol.stmt:
- self.find_statement(tree[1], dict)
- elif (tree[0] == token.NAME
- and tree[1] in ['elif', 'except', 'finally']):
- dict[tree[2]] = 1
+ for i in range(1, len(tree)):
+ if type(tree[i]) == type(()):
+ if tree[i][0] == symbol.suite:
+ # Found a suite, look back for the colon and keyword.
+ lineno_colon = lineno_word = None
+ for j in range(i-1, 0, -1):
+ if tree[j][0] == token.COLON:
+ lineno_colon = tree[j][2]
+ elif tree[j][0] == token.NAME:
+ if tree[j][1] == 'elif':
+ # Find the line number of the first non-terminal
+ # after the keyword.
+ t = tree[j+1]
+ while t and token.ISNONTERMINAL(t[0]):
+ t = t[1]
+ if t:
+ lineno_word = t[2]
+ else:
+ lineno_word = tree[j][2]
+ break
+ elif tree[j][0] == symbol.except_clause:
+ # "except" clauses look like:
+ # ('except_clause', ('NAME', 'except', lineno), ...)
+ if tree[j][1][0] == token.NAME:
+ lineno_word = tree[j][1][2]
+ break
+ if lineno_colon and lineno_word:
+ # Found colon and keyword, mark all the lines
+ # between the two with the two line numbers.
+ for l in range(lineno_word, lineno_colon+1):
+ spots[l] = (lineno_word, lineno_colon)
+ self.get_suite_spots(tree[i], spots)
- def find_statement(self, tree, dict):
- import token
- while token.ISNONTERMINAL(tree[0]):
- tree = tree[1]
- dict[tree[2]] = 1
+ def find_executable_statements(self, text, exclude=None):
+ # Find lines which match an exclusion pattern.
+ excluded = {}
+ suite_spots = {}
+ if exclude:
+ reExclude = re.compile(exclude)
+ lines = text.split('\n')
+ for i in range(len(lines)):
+ if reExclude.search(lines[i]):
+ excluded[i+1] = 1
+
+ import parser
+ tree = parser.suite(text+'\n\n').totuple(1)
+ self.get_suite_spots(tree, suite_spots)
+
+ # Use the compiler module to parse the text and find the executable
+ # statements. We add newlines to be impervious to final partial lines.
+ statements = {}
+ ast = compiler.parse(text+'\n\n')
+ visitor = StatementFindingAstVisitor(statements, excluded, suite_spots)
+ compiler.walk(ast, visitor, walker=visitor)
+
+ lines = statements.keys()
+ lines.sort()
+ excluded_lines = excluded.keys()
+ excluded_lines.sort()
+ return lines, excluded_lines
# format_lines(statements, lines). Format a list of line numbers
# for printing by coalescing groups of lines as long as the lines
@@ -367,11 +646,15 @@ class coverage:
return "%d" % start
else:
return "%d-%d" % (start, end)
- import string
return string.join(map(stringify, pairs), ", ")
+ # Backward compatibility with version 1.
def analysis(self, morf):
- filename, statements = self.analyze_morf(morf)
+ f, s, _, m, mf = self.analysis2(morf)
+ return f, s, m, mf
+
+ def analysis2(self, morf):
+ filename, statements, excluded = self.analyze_morf(morf)
self.canonicalize_filenames()
if not self.cexecuted.has_key(filename):
self.cexecuted[filename] = {}
@@ -379,18 +662,45 @@ class coverage:
for line in statements:
if not self.cexecuted[filename].has_key(line):
missing.append(line)
- return (filename, statements, missing,
+ return (filename, statements, excluded, missing,
self.format_lines(statements, missing))
+ def relative_filename(self, filename):
+ """ Convert filename to relative filename from self.relative_dir.
+ """
+ return filename.replace(self.relative_dir, "")
+
def morf_name(self, morf):
+ """ Return the name of morf as used in report.
+ """
if isinstance(morf, types.ModuleType):
return morf.__name__
else:
- return os.path.splitext(os.path.basename(morf))[0]
+ return self.relative_filename(os.path.splitext(morf)[0])
- def report(self, morfs, show_missing=1, ignore_errors=0):
+ def filter_by_prefix(self, morfs, omit_prefixes):
+ """ Return list of morfs where the morf name does not begin
+ with any one of the omit_prefixes.
+ """
+ filtered_morfs = []
+ for morf in morfs:
+ for prefix in omit_prefixes:
+ if self.morf_name(morf).startswith(prefix):
+ break
+ else:
+ filtered_morfs.append(morf)
+
+ return filtered_morfs
+
+ def morf_name_compare(self, x, y):
+ return cmp(self.morf_name(x), self.morf_name(y))
+
+ def report(self, morfs, show_missing=1, ignore_errors=0, file=None, omit_prefixes=[]):
if not isinstance(morfs, types.ListType):
morfs = [morfs]
+ morfs = self.filter_by_prefix(morfs, omit_prefixes)
+ morfs.sort(self.morf_name_compare)
+
max_name = max([5,] + map(len, map(self.morf_name, morfs)))
fmt_name = "%%- %ds " % max_name
fmt_err = fmt_name + "%s: %s"
@@ -399,14 +709,16 @@ class coverage:
if show_missing:
header = header + " Missing"
fmt_coverage = fmt_coverage + " %s"
- print header
- print "-" * len(header)
+ if not file:
+ file = sys.stdout
+ print >>file, header
+ print >>file, "-" * len(header)
total_statements = 0
total_executed = 0
for morf in morfs:
name = self.morf_name(morf)
try:
- _, statements, missing, readable = self.analysis(morf)
+ _, statements, _, missing, readable = self.analysis2(morf)
n = len(statements)
m = n - len(missing)
if n > 0:
@@ -416,17 +728,17 @@ class coverage:
args = (name, n, m, pc)
if show_missing:
args = args + (readable,)
- print fmt_coverage % args
+ print >>file, fmt_coverage % args
total_statements = total_statements + n
total_executed = total_executed + m
- except KeyboardInterrupt:
+ except KeyboardInterrupt: #pragma: no cover
raise
except:
if not ignore_errors:
type, msg = sys.exc_info()[0:2]
- print fmt_err % (name, type, msg)
+ print >>file, fmt_err % (name, type, msg)
if len(morfs) > 1:
- print "-" * len(header)
+ print >>file, "-" * len(header)
if total_statements > 0:
pc = 100.0 * total_executed / total_statements
else:
@@ -434,76 +746,88 @@ class coverage:
args = ("TOTAL", total_statements, total_executed, pc)
if show_missing:
args = args + ("",)
- print fmt_coverage % args
+ print >>file, fmt_coverage % args
# annotate(morfs, ignore_errors).
- blank_re = re.compile("\\s*(#|$)")
- else_re = re.compile("\\s*else\\s*:\\s*(#|$)")
+ blank_re = re.compile(r"\s*(#|$)")
+ else_re = re.compile(r"\s*else\s*:\s*(#|$)")
- def annotate(self, morfs, directory=None, ignore_errors=0):
+ def annotate(self, morfs, directory=None, ignore_errors=0, omit_prefixes=[]):
+ morfs = self.filter_by_prefix(morfs, omit_prefixes)
for morf in morfs:
try:
- filename, statements, missing, _ = self.analysis(morf)
- source = open(filename, 'r')
- if directory:
- dest_file = os.path.join(directory,
- os.path.basename(filename)
- + ',cover')
- else:
- dest_file = filename + ',cover'
- dest = open(dest_file, 'w')
- lineno = 0
- i = 0
- j = 0
- covered = 1
- while 1:
- line = source.readline()
- if line == '':
- break
- lineno = lineno + 1
- while i < len(statements) and statements[i] < lineno:
- i = i + 1
- while j < len(missing) and missing[j] < lineno:
- j = j + 1
- if i < len(statements) and statements[i] == lineno:
- covered = j >= len(missing) or missing[j] > lineno
- if self.blank_re.match(line):
- dest.write(' ')
- elif self.else_re.match(line):
- # Special logic for lines containing only
- # 'else:'. See [GDR 2001-12-04b, 3.2].
- if i >= len(statements) and j >= len(missing):
- dest.write('! ')
- elif i >= len(statements) or j >= len(missing):
- dest.write('> ')
- elif statements[i] == missing[j]:
- dest.write('! ')
- else:
- dest.write('> ')
- elif covered:
- dest.write('> ')
- else:
- dest.write('! ')
- dest.write(line)
- source.close()
- dest.close()
+ filename, statements, excluded, missing, _ = self.analysis2(morf)
+ self.annotate_file(filename, statements, excluded, missing, directory)
except KeyboardInterrupt:
raise
except:
if not ignore_errors:
raise
-
+
+ def annotate_file(self, filename, statements, excluded, missing, directory=None):
+ source = open(filename, 'r')
+ if directory:
+ dest_file = os.path.join(directory,
+ os.path.basename(filename)
+ + ',cover')
+ else:
+ dest_file = filename + ',cover'
+ dest = open(dest_file, 'w')
+ lineno = 0
+ i = 0
+ j = 0
+ covered = 1
+ while 1:
+ line = source.readline()
+ if line == '':
+ break
+ lineno = lineno + 1
+ while i < len(statements) and statements[i] < lineno:
+ i = i + 1
+ while j < len(missing) and missing[j] < lineno:
+ j = j + 1
+ if i < len(statements) and statements[i] == lineno:
+ covered = j >= len(missing) or missing[j] > lineno
+ if self.blank_re.match(line):
+ dest.write(' ')
+ elif self.else_re.match(line):
+ # Special logic for lines containing only 'else:'.
+ # See [GDR 2001-12-04b, 3.2].
+ if i >= len(statements) and j >= len(missing):
+ dest.write('! ')
+ elif i >= len(statements) or j >= len(missing):
+ dest.write('> ')
+ elif statements[i] == missing[j]:
+ dest.write('! ')
+ else:
+ dest.write('> ')
+ elif lineno in excluded:
+ dest.write('- ')
+ elif covered:
+ dest.write('> ')
+ else:
+ dest.write('! ')
+ dest.write(line)
+ source.close()
+ dest.close()
# Singleton object.
the_coverage = coverage()
# Module functions call methods in the singleton object.
-def start(*args): return apply(the_coverage.start, args)
-def stop(*args): return apply(the_coverage.stop, args)
-def erase(*args): return apply(the_coverage.erase, args)
-def analysis(*args): return apply(the_coverage.analysis, args)
-def report(*args): return apply(the_coverage.report, args)
+def use_cache(*args, **kw): return the_coverage.use_cache(*args, **kw)
+def start(*args, **kw): return the_coverage.start(*args, **kw)
+def stop(*args, **kw): return the_coverage.stop(*args, **kw)
+def erase(*args, **kw): return the_coverage.erase(*args, **kw)
+def begin_recursive(*args, **kw): return the_coverage.begin_recursive(*args, **kw)
+def end_recursive(*args, **kw): return the_coverage.end_recursive(*args, **kw)
+def exclude(*args, **kw): return the_coverage.exclude(*args, **kw)
+def analysis(*args, **kw): return the_coverage.analysis(*args, **kw)
+def analysis2(*args, **kw): return the_coverage.analysis2(*args, **kw)
+def report(*args, **kw): return the_coverage.report(*args, **kw)
+def annotate(*args, **kw): return the_coverage.annotate(*args, **kw)
+def annotate_file(*args, **kw): return the_coverage.annotate_file(*args, **kw)
# Save coverage data when Python exits. (The atexit module wasn't
# introduced until Python 2.0, so use sys.exitfunc when it's not
@@ -516,18 +840,18 @@ except ImportError:
# Command-line interface.
if __name__ == '__main__':
- the_coverage.command_line()
+ the_coverage.command_line(sys.argv[1:])
# A. REFERENCES
#
# [GDR 2001-12-04a] "Statement coverage for Python"; Gareth Rees;
# Ravenbrook Limited; 2001-12-04;
-# .
+# .
#
# [GDR 2001-12-04b] "Statement coverage for Python: design and
# analysis"; Gareth Rees; Ravenbrook Limited; 2001-12-04;
-# .
+# .
#
# [van Rossum 2001-07-20a] "Python Reference Manual (releae 2.1.1)";
# Guide van Rossum; 2001-07-20;
@@ -561,10 +885,44 @@ if __name__ == '__main__':
# so that it matches the value the program would get if it were run on
# its own.
#
+# 2004-12-12 NMB Significant code changes.
+# - Finding executable statements has been rewritten so that docstrings and
+# other quirks of Python execution aren't mistakenly identified as missing
+# lines.
+# - Lines can be excluded from consideration, even entire suites of lines.
+# - The filesystem cache of covered lines can be disabled programmatically.
+# - Modernized the code.
#
+# 2004-12-14 NMB Minor tweaks. Return 'analysis' to its original behavior
+# and add 'analysis2'. Add a global for 'annotate', and factor it, adding
+# 'annotate_file'.
+#
+# 2004-12-31 NMB Allow for keyword arguments in the module global functions.
+# Thanks, Allen.
+#
+# 2005-12-02 NMB Call threading.settrace so that all threads are measured.
+# Thanks Martin Fuzzey. Add a file argument to report so that reports can be
+# captured to a different destination.
+#
+# 2005-12-03 NMB coverage.py can now measure itself.
+#
+# 2005-12-04 NMB Adapted Greg Rogers' patch for using relative filenames,
+# and sorting and omitting files to report on.
+#
+# 2006-07-23 NMB Applied Joseph Tate's patch for function decorators.
+#
+# 2006-08-21 NMB Applied Sigve Tjora and Mark van der Wal's fixes for argument
+# handling.
+#
+# 2006-08-22 NMB Applied Geoff Bache's parallel mode patch.
+#
+# 2006-08-23 NMB Refactorings to improve testability. Fixes to command-line
+# logic for parallel mode and collect.
+
# C. COPYRIGHT AND LICENCE
#
# Copyright 2001 Gareth Rees. All rights reserved.
+# Copyright 2004-2006 Ned Batchelder. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
@@ -591,6 +949,4 @@ if __name__ == '__main__':
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
#
-#
-#
-# $Id: coverage.py,v 1.1 2005/07/11 20:39:31 dax Exp $
+# $Id: coverage.py 47 2006-08-24 01:08:48Z Ned $
diff --git a/run_test.py b/run_test.py
index ffd7ffe..1823549 100644
--- a/run_test.py
+++ b/run_test.py
@@ -1,10 +1,11 @@
+# -*- coding: utf-8 -*-
##
-## run_test.py
+## run_tests.py
## Login : David Rousselie
-## Started on Wed May 18 13:33:03 2005 David Rousselie
-## $Id: run_test.py,v 1.2 2005/09/18 20:24:07 David Rousselie Exp $
+## Started on Wed Aug 9 21:37:35 2006 David Rousselie
+## $Id$
##
-## Copyright (C) 2005 David Rousselie
+## Copyright (C) 2006 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
@@ -23,7 +24,9 @@
import coverage
coverage.erase()
coverage.start()
+import logging
import unittest
+from test import test_support
import sys
sys.path.append("src")
@@ -32,67 +35,32 @@ sys.setdefaultencoding('utf8')
del sys.setdefaultencoding
import tests
-from tests.test_mailconnection import *
-from tests.test_mailconnection_factory import *
-from tests.test_component import *
-from tests.test_storage import *
-from tests.test_lang import *
-from test import test_support
-import logging
-import jmc
+from tests.jmc.model.test_account import *
+import jmc
+import jmc.jabber
+#import jmc.jabber.component
if __name__ == '__main__':
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
- mail_connection_suite = unittest.makeSuite(MailConnection_TestCase, \
- "test")
- pop3_connection_suite = unittest.makeSuite(POP3Connection_TestCase, \
- "test")
- imap_connection_suite = unittest.makeSuite(IMAPConnection_TestCase, \
- "test")
- mc_factory_suite = unittest.makeSuite(MailConnectionFactory_TestCase, \
- "test")
- component_suite = unittest.makeSuite(MailComponent_TestCase_Basic, \
- "test")
- component2_suite = unittest.makeSuite(MailComponent_TestCase_NoReg, \
- "test")
- storage_suite = unittest.makeSuite(Storage_TestCase, \
- "test")
- dbmstorage_suite = unittest.makeSuite(DBMStorage_TestCase, \
- "test")
- sqlitestorage_suite = unittest.makeSuite(SQLiteStorage_TestCase, \
- "test")
- lang_suite = unittest.makeSuite(Lang_TestCase, \
- "test")
-
- jmc_suite = unittest.TestSuite((mail_connection_suite, \
- pop3_connection_suite, \
- imap_connection_suite, \
- mc_factory_suite, \
- # component_suite, \
- # component2_suite, \
- storage_suite, \
- dbmstorage_suite, \
- sqlitestorage_suite, \
- lang_suite))
- #test_support.run_suite(mail_connection_suite)
- #test_support.run_suite(pop3_connection_suite)
- #test_support.run_suite(imap_connection_suite)
- #test_support.run_suite(mc_factory_suite)
- #test_support.run_suite(component_suite)
- #test_support.run_suite(component2_suite)
- #test_support.run_suite(storage_suite)
- #test_support.run_suite(sqlitestorage_suite)
- #test_support.run_suite(dbmstorage_suite)
+
+ mail_account_suite = unittest.makeSuite(MailAccount_TestCase, "test")
+ imap_account_suite = unittest.makeSuite(IMAPAccount_TestCase, "test")
+ pop3_account_suite = unittest.makeSuite(POP3Account_TestCase, "test")
+
+ jmc_suite = unittest.TestSuite()
+ jmc_suite = unittest.TestSuite((mail_account_suite, \
+ imap_account_suite, \
+ pop3_account_suite))
test_support.run_suite(jmc_suite)
+
coverage.stop()
-coverage.analysis(jmc.email.mailconnection_factory)
-coverage.analysis(jmc.email.mailconnection)
-coverage.analysis(jmc.jabber.component)
-coverage.analysis(jmc.jabber.x)
-coverage.analysis(jmc.utils.lang)
-coverage.report([jmc.email.mailconnection_factory, jmc.email.mailconnection, \
- jmc.jabber.component, jmc.jabber.x, jmc.utils.lang])
+#coverage.analysis(jmc.jabber.component)
+coverage.analysis(jmc.model.account)
+
+coverage.report([
+ jmc.model.account])
+
diff --git a/src/jmc.py b/src/jmc.py
index 84d3989..caf5029 100755
--- a/src/jmc.py
+++ b/src/jmc.py
@@ -1,12 +1,10 @@
-#!/usr/bin/python -u
##
-## Jabber Mail Component
## jmc.py
-## Login : David Rousselie
-## Started on Fri Jan 7 11:06:42 2005
-## $Id: jmc.py,v 1.3 2005/07/11 20:39:31 dax Exp $
+## Login :
+## Started on Fri Jan 19 18:14:41 2007 David Rousselie
+## $Id$
##
-## Copyright (C) 2005
+## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
@@ -22,70 +20,27 @@
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
-import sys
-import os.path
import logging
+import sys
reload(sys)
sys.setdefaultencoding('utf-8')
del sys.setdefaultencoding
-from jmc.email import mailconnection
-from jmc.jabber.component import MailComponent, ComponentFatalError
-from jmc.utils.config import Config
-def main(config_file = "jmc.xml", isDebug = 0):
- try:
- logger = logging.getLogger()
- logger.addHandler(logging.StreamHandler())
- if isDebug > 0:
- logger.setLevel(logging.DEBUG)
- try:
- logger.debug("Loading config file " + config_file)
- config = Config(config_file)
- except:
- print >>sys.stderr, "Couldn't load config file:", \
- str(sys.exc_value)
- sys.exit(1)
+from sqlobject import *
+from pyxmpp.message import Message
- pidfile = open(config.get_content("config/pidfile"), "w")
- pidfile.write(str(os.getpid()))
- pidfile.close()
- mailconnection.default_encoding = config.get_content("config/mail_default_encoding")
- print "creating component..."
- mailcomp = MailComponent(config.get_content("config/jabber/service"), \
- config.get_content("config/jabber/secret"), \
- config.get_content("config/jabber/server"), \
- int(config.get_content("config/jabber/port")), \
- config.get_content("config/jabber/language"), \
- int(config.get_content("config/check_interval")), \
- config.get_content("config/spooldir"), \
- config.get_content("config/storage"), \
- config.get_content("config/jabber/vCard/FN"))
-
- print "starting..."
- mailcomp.run(1)
- os.remove(config.get_content("config/pidfile"))
- except ComponentFatalError,e:
- print e
- print "Aborting."
- sys.exit(1)
-
-if __name__ == "__main__":
- var_option = 0
- file_num = 0
- index = 0
- debug_level = 0
- for opt in sys.argv:
- if var_option == 0 and len(opt) == 2 and opt == "-c":
- var_option += 1
- elif (var_option & 1) == 1 and len(opt) > 0:
- var_option += 1
- file_num = index
- if len(opt) == 2 and opt == "-D":
- debug_level = 1
- index += 1
- if (var_option & 2) == 2:
- main(sys.argv[file_num], debug_level)
- else:
- main("/etc/jabber/jmc.xml", debug_level)
+from jmc.jabber.component import MailComponent
+from jmc.model.account import MailPresenceAccount
+logger = logging.getLogger()
+logger.addHandler(logging.StreamHandler())
+logger.setLevel(logging.DEBUG)
+component = MailComponent("jmc.localhost", \
+ "secret", \
+ "127.0.0.1", \
+ 5349, \
+ "sqlite:///tmp/jmc_test.db")
+component.account_class = MailPresenceAccount
+component.run()
+logger.debug("JMC is exiting")
diff --git a/src/jmc/__init__.py b/src/jmc/__init__.py
index e69de29..7c37883 100644
--- a/src/jmc/__init__.py
+++ b/src/jmc/__init__.py
@@ -0,0 +1,2 @@
+"""JMC module"""
+__revision__ = ""
diff --git a/src/jmc/utils/config.py b/src/jmc/config.py
similarity index 100%
rename from src/jmc/utils/config.py
rename to src/jmc/config.py
diff --git a/src/jmc/email/mailconnection_factory.py b/src/jmc/email/mailconnection_factory.py
deleted file mode 100644
index 1187fb1..0000000
--- a/src/jmc/email/mailconnection_factory.py
+++ /dev/null
@@ -1,142 +0,0 @@
-##
-## mailconnection_factory.py
-## Login : David Rousselie
-## Started on Fri May 20 10:41:46 2005 David Rousselie
-## $Id: mailconnection_factory.py,v 1.2 2005/09/18 20:24:07 dax Exp $
-##
-## Copyright (C) 2005
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-##
-
-import jmc.email.mailconnection as mailconnection
-from jmc.email.mailconnection import IMAPConnection, POP3Connection
-
-def get_new_mail_connection(type):
- """ Static method to return an empty MailConnection object of given type
- :Parameters:
- - 'type': type of connection to return : 'imap', 'imaps', 'pop3', 'pop3s'
-
- :return: MailConnection of given type in parameter, None if unknown type
-
- :returntype: 'MailConnection'
- """
- if type == "imap":
- return IMAPConnection()
- elif type == "imaps":
- return IMAPConnection(ssl = True)
- elif type == "pop3":
- return POP3Connection()
- elif type == "pop3s":
- return POP3Connection(ssl = True)
- raise Exception, "Connection type \"" + type + "\" unknown"
-
-def str_to_mail_connection(connection_string):
- """ Static methode to create a MailConnection object filled from string
-
- :Parameters:
- - 'connection_string': string containing MailConnection parameters separated
- by '#'. ex: 'pop3#login#password#host#110#chat_action#online_action#away_action#xa_action#dnd_action#offline_action#check_interval#liv_email_only(#Mailbox)'
-
- :Types:
- - 'connection_string': string
-
- :return: MailConnection of given type found in string parameter
-
- :returntype: 'MailConnection'
- """
- arg_list = connection_string.split("#")
- # optionals values must be the at the beginning of the list to pop them
- # last
- arg_list.reverse()
- type = arg_list.pop()
- login = arg_list.pop()
- password = arg_list.pop()
- if password == "/\\":
- password = None
- store_password = False
- else:
- store_password = True
- host = arg_list.pop()
- port = int(arg_list.pop())
- chat_action = None
- online_action = None
- away_action = None
- xa_action = None
- dnd_action = None
- offline_action = None
- interval = None
- live_email_only = False
- result = None
- if type[0:4] == "imap":
- if len(arg_list) == 9:
- chat_action = int(arg_list.pop())
- online_action = int(arg_list.pop())
- away_action = int(arg_list.pop())
- xa_action = int(arg_list.pop())
- dnd_action = int(arg_list.pop())
- offline_action = int(arg_list.pop())
- interval = int(arg_list.pop())
- live_email_only = (arg_list.pop().lower() == "true")
- else:
- retrieve = bool(arg_list.pop() == "True")
- if retrieve:
- chat_action = online_action = away_action = xa_action = dnd_action = mailconnection.RETRIEVE
- else:
- chat_action = online_action = away_action = xa_action = dnd_action = mailconnection.DIGEST
- offline_action = mailconnection.DO_NOTHING
- mailbox = arg_list.pop()
- result = IMAPConnection(login = login, \
- password = password, \
- host = host, \
- ssl = (len(type) == 5), \
- port = port, \
- mailbox = mailbox)
- elif type[0:4] == "pop3":
- if len(arg_list) == 8:
- chat_action = int(arg_list.pop())
- online_action = int(arg_list.pop())
- away_action = int(arg_list.pop())
- xa_action = int(arg_list.pop())
- dnd_action = int(arg_list.pop())
- offline_action = int(arg_list.pop())
- interval = int(arg_list.pop())
- live_email_only = (arg_list.pop().lower() == "true")
- else:
- retrieve = bool(arg_list.pop() == "True")
- if retrieve:
- chat_action = online_action = away_action = xa_action = dnd_action = mailconnection.RETRIEVE
- else:
- chat_action = online_action = away_action = xa_action = dnd_action = mailconnection.DIGEST
- offline_action = mailconnection.DO_NOTHING
- result = POP3Connection(login = login, \
- password = password, \
- host = host, \
- port = port, \
- ssl = (len(type) == 5))
- if result is None:
- raise Exception, "Connection type \"" + type + "\" unknown"
- result.store_password = store_password
- result.chat_action = chat_action
- result.online_action = online_action
- result.away_action = away_action
- result.xa_action = xa_action
- result.dnd_action = dnd_action
- result.offline_action = offline_action
- if interval is not None:
- result.interval = interval
- result.live_email_only = live_email_only
- return result
-
-
diff --git a/src/jmc/jabber/__init__.py b/src/jmc/jabber/__init__.py
index e69de29..e90a3dc 100644
--- a/src/jmc/jabber/__init__.py
+++ b/src/jmc/jabber/__init__.py
@@ -0,0 +1,2 @@
+"""Jabber component classes"""
+__revision__ = ""
diff --git a/src/jmc/jabber/x.py b/src/jmc/jabber/x.py
deleted file mode 100644
index a9e41d3..0000000
--- a/src/jmc/jabber/x.py
+++ /dev/null
@@ -1,129 +0,0 @@
-##
-## x.py
-## Login : David Rousselie
-## Started on Fri Jan 7 11:06:42 2005
-## $Id: x.py,v 1.3 2005/09/18 20:24:07 dax Exp $
-##
-## Copyright (C) 2005
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-##
-
-import sys
-from pyxmpp.stanza import common_doc
-
-class Option(object):
- def __init__(self, label, value):
- self.__label = label
- self.__value = value
-
- def get_xml(self, parent):
- if parent is None:
- option = common_doc.newChild(None, "option", None)
- else:
- option = parent.newChild(None, "option", None)
- option.setProp("label", self.__label)
- option.newChild(None, "value", self.__value)
- return option
-
-class Field(object):
- def __init__(self, type, label, var, value):
- self.__type = type
- self.__label = label
- self.__var = var
- self.value = value
- self.__options = []
-
- def add_option(self, label, value):
- option = Option(label, value)
- self.__options.append(option)
- return option
-
- def get_xml(self, parent):
- if parent is None:
- raise Exception, "parent field should not be None"
- else:
- field = parent.newChild(None, "field", None)
- field.setProp("type", self.__type)
- if not self.__label is None:
- field.setProp("label", self.__label)
- if not self.__var is None:
- field.setProp("var", self.__var)
- if self.value:
- field.newChild(None, "value", self.value)
- for option in self.__options:
- option.get_xml(field)
- return field
-
-class X(object):
- def __init__(self):
- self.fields = {}
- self.fields_tab = []
- self.title = None
- self.instructions = None
- self.type = None
- self.xmlns = None
-
- def add_field(self, type = "fixed", label = None, var = None, value = ""):
- field = Field(type, label, var, value)
- self.fields[var] = field
- # fields_tab exist to keep added fields order
- self.fields_tab.append(field)
- return field
-
- def attach_xml(self, iq):
- node = iq.newChild(None, "x", None)
- _ns = node.newNs(self.xmlns, None)
- node.setNs(_ns)
- if not self.title is None:
- node.newTextChild(None, "title", self.title)
- if not self.instructions is None:
- node.newTextChild(None, "instructions", self.instructions)
- for field in self.fields_tab:
- field.get_xml(node)
- return node
-
- def from_xml(self, node):
- ## TODO : test node type and ns and clean that loop !!!!
- while node and node.type != "element":
- node = node.next
- child = node.children
- while child:
- ## TODO : test child type (element) and ns (jabber:x:data)
- if child.type == "element" and child.name == "field":
- if child.hasProp("type"):
- type = child.prop("type")
- else:
- type = ""
-
- if child.hasProp("label"):
- label = child.prop("label")
- else:
- label = ""
-
- if child.hasProp("var"):
- var = child.prop("var")
- else:
- var = ""
-
- xval = child.children
- while xval and xval.name != "value":
- xval = xval.next
- if xval:
- value = xval.getContent()
- else:
- value = ""
- field = Field(type, label, var, value)
- self.fields[var] = field
- child = child.next
diff --git a/src/jmc/utils/lang.py b/src/jmc/lang.py
similarity index 100%
rename from src/jmc/utils/lang.py
rename to src/jmc/lang.py
diff --git a/src/jmc/model/__init__.py b/src/jmc/model/__init__.py
new file mode 100644
index 0000000..7b93e19
--- /dev/null
+++ b/src/jmc/model/__init__.py
@@ -0,0 +1,2 @@
+"""Contains data model classes"""
+__revision__ = ""
diff --git a/src/jmc/email/mailconnection.py b/src/jmc/model/account.py
similarity index 67%
rename from src/jmc/email/mailconnection.py
rename to src/jmc/model/account.py
index 4f68847..496423d 100644
--- a/src/jmc/email/mailconnection.py
+++ b/src/jmc/model/account.py
@@ -1,10 +1,10 @@
##
-## mailconnection.py
-## Login : David Rousselie
-## Started on Fri Jan 7 11:06:42 2005
-## $Id: mailconnection.py,v 1.11 2005/09/18 20:24:07 dax Exp $
+## account.py
+## Login :
+## Started on Fri Jan 19 18:21:44 2007 David Rousselie
+## $Id$
##
-## Copyright (C) 2005
+## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
@@ -20,7 +20,6 @@
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
-
import sys
import logging
import email
@@ -31,16 +30,15 @@ import poplib
import imaplib
import socket
-from jmc.utils.lang import Lang
+from sqlobject.inheritance import InheritableSQLObject
+from sqlobject.col import StringCol, IntCol, BoolCol
+
+from jcl.model.account import PresenceAccount
+from jmc.lang import Lang
IMAP4_TIMEOUT = 10
POP3_TIMEOUT = 10
-DO_NOTHING = 0
-DIGEST = 1
-RETRIEVE = 2
-default_encoding = "iso-8859-1"
-
## All MY* classes are implemented to add a timeout (settimeout)
## while connecting
class MYIMAP4(imaplib.IMAP4):
@@ -127,77 +125,84 @@ class MYPOP3_SSL(poplib.POP3_SSL):
self._debugging = 0
self.welcome = self._getresp()
-class MailConnection(object):
+class MailAccount(PresenceAccount):
""" Wrapper to mail connection and action.
Abstract class, do not represent real mail connection type"""
- _logger = logging.getLogger("jmc.MailConnection")
+ # Define constants
+ DIGEST = 1
+ RETRIEVE = 2
+ default_encoding = "iso-8859-1"
+ possibles_actions = [PresenceAccount.DO_NOTHING, \
+ DIGEST, \
+ RETRIEVE]
- def __init__(self, login = "", password = "", host = "", \
- port = 110, ssl = False):
- """ Initialize MailConnection object for common parameters of all
- connections types
-
- :Parameters:
- - 'login': login used to connect mail server
- - 'password': password associated with 'login' to connect mail server
- - 'host': mail server hostname
- - 'port': mail server port
- - 'ssl': activate ssl to connect server
-
- :Types:
- - 'login': string
- - 'password': string
- - 'host': string
- - 'port': int
- - 'ssl': boolean"""
- self.login = login
- self.password = password
- self.store_password = True
- self.host = host
- self.port = port
- self.ssl = ssl
- self.status = "offline"
- self.connection = None
- self.chat_action = RETRIEVE
- self.online_action = RETRIEVE
- self.away_action = RETRIEVE
- self.xa_action = RETRIEVE
- self.dnd_action = RETRIEVE
- self.offline_action = DO_NOTHING
- self.interval = 5
- self.lastcheck = 0
- self.default_lang_class = Lang.en
- self.waiting_password_reply = False
- self.in_error = False
- self.live_email_only = False
- self.first_check = True
-
- def __eq__(self, other):
- return self.get_type() == other.get_type() \
- and self.login == other.login \
- and (not self.store_password or self.password == other.password) \
- and self.store_password == other.store_password \
- and self.host == other.host \
- and self.port == other.port \
- and self.ssl == other.ssl \
- and self.chat_action == other.chat_action \
- and self.online_action == other.online_action \
- and self.away_action == other.away_action \
- and self.xa_action == other.xa_action \
- and self.dnd_action == other.dnd_action \
- and self.offline_action == other.offline_action \
- and self.interval == other.interval \
- and self.live_email_only == other.live_email_only
+ login = StringCol(default = "")
+ password = StringCol(default = None)
+ host = StringCol(default = "localhost")
+ port = IntCol(default = 110)
+ ssl = BoolCol(default = False)
+ interval = IntCol(default = 5)
+ store_password = BoolCol(default = True)
+ live_email_only = BoolCol(default = False)
- def __str__(self):
- return self.get_type() + "#" + self.login + "#" + \
- (self.store_password and self.password or "/\\") + "#" \
- + self.host + "#" + str(self.port) + "#" + str(self.chat_action) + "#" \
- + str(self.online_action) + "#" + str(self.away_action) + "#" + \
- str(self.xa_action) + "#" + str(self.dnd_action) + "#" + \
- str(self.offline_action) + "#" + str(self.interval) + "#" + str(self.live_email_only)
-
+ lastcheck = IntCol(default = 0)
+ waiting_password_reply = BoolCol(default = False)
+ in_error = BoolCol(default = False)
+ first_check = BoolCol(default = True)
+
+ def _init(self, *args, **kw):
+ """MailAccount init
+ Initialize class attributes"""
+ InheritableSQLObject._init(self, *args, **kw)
+ self.__logger = logging.getLogger("jmc.model.account.MailAccount")
+ self.connection = None
+ self.default_lang_class = Lang.en # TODO: use String
+
+ def _get_register_fields(cls):
+ """See Account._get_register_fields
+ """
+ def password_post_func(password):
+ if password is None or password == "":
+ return None
+ return password
+
+ return Account.get_register_fields() + \
+ [("login", "text-single", None, account.string_not_null_post_func, \
+ account.mandatory_field), \
+ ("password", "text-private", None, password_post_func, \
+ (lambda field_name: None)), \
+ ("host", "text-single", None, account.string_not_null_post_func, \
+ account.mandatory_field), \
+ ("port", "text-single", None, account.int_post_func, \
+ account.mandatory_field), \
+ ("ssl", "boolean", None, account.default_post_func, \
+ (lambda field_name: None)), \
+ ("store_password", "boolean", None, account.default_post_func, \
+ (lambda field_name: True)), \
+ ("live_email_only", "boolean", None, account.default_post_func, \
+ lambda field_name: False)]
+
+ get_register_fields = classmethod(_get_register_fields)
+
+ def _get_presence_actions_fields(cls):
+ """See PresenceAccount._get_presence_actions_fields
+ """
+ return {'chat_action': (cls.possibles_actions, \
+ RETRIEVE), \
+ 'online_action': (cls.possibles_actions, \
+ RETRIEVE), \
+ 'away_action': (cls.possibles_actions, \
+ RETRIEVE), \
+ 'xa_action': (cls.possibles_actions, \
+ RETRIEVE), \
+ 'dnd_action': (cls.possibles_actions, \
+ RETRIEVE), \
+ 'offline_action': (cls.possibles_actions, \
+ PresenceAccount.DO_NOTHING)}
+
+ get_presence_actions_fields = classmethod(_get_presence_actions_fields)
+
def get_decoded_part(self, part, charset_hint):
content_charset = part.get_content_charset()
result = u""
@@ -292,26 +297,26 @@ class MailConnection(object):
unicode(self.port)
def connect(self):
- pass
+ raise NotImplementedError
def disconnect(self):
- pass
+ raise NotImplementedError
def get_mail_list(self):
- return 0
+ raise NotImplementedError
def get_mail(self, index):
- return None
+ raise NotImplementedError
def get_mail_summary(self, index):
- return None
+ raise NotImplementedError
def get_next_mail_index(self, mail_list):
- pass
+ raise NotImplementedError
# Does not modify server state but just internal JMC state
def mark_all_as_read(self):
- pass
+ raise NotImplementedError
def get_action(self):
mapping = {"online": self.online_action,
@@ -322,29 +327,35 @@ class MailConnection(object):
"offline": self.offline_action}
if mapping.has_key(self.status):
return mapping[self.status]
- return DO_NOTHING
+ return PresenceAccount.DO_NOTHING
action = property(get_action)
-
-class IMAPConnection(MailConnection):
- _logger = logging.getLogger("jmc.IMAPConnection")
- def __init__(self, login = "", password = "", host = "", \
- port = None, ssl = False, mailbox = "INBOX"):
- if not port:
- if ssl:
- port = 993
- else:
- port = 143
- MailConnection.__init__(self, login, password, host, port, ssl)
- self.mailbox = mailbox
+class IMAPAccount(MailAccount):
+ mailbox = StringCol(default = "INBOX") # TODO : set default INBOX in reg_form (use get_register_fields last field ?)
- def __eq__(self, other):
- return MailConnection.__eq__(self, other) \
- and self.mailbox == other.mailbox
+ def _get_register_fields(cls):
+ """See Account._get_register_fields
+ """
+ def password_post_func(password):
+ if password is None or password == "":
+ return None
+ return password
+
+ return MailAccount.get_register_fields() + \
+ [("mailbox", "text-single", None, account.string_not_null_post_func, \
+ (lambda field_name: "INBOX")), \
+ ("password", "text-private", None, password_post_func, \
+ (lambda field_name: None)), \
+ ("store_password", "boolean", None, account.boolean_post_func, \
+ lambda field_name: True)]
- def __str__(self):
- return MailConnection.__str__(self) + "#" + self.mailbox
+ get_register_fields = classmethod(_get_register_fields)
+
+
+ def _init(self, *args, **kw):
+ MailAccount._init(self, *args, **kw)
+ self.__logger = logging.getLogger("jmc.IMAPConnection")
def get_type(self):
if self.ssl:
@@ -352,10 +363,10 @@ class IMAPConnection(MailConnection):
return "imap"
def get_status(self):
- return MailConnection.get_status(self) + "/" + self.mailbox
+ return MailAccount.get_status(self) + "/" + self.mailbox
def connect(self):
- IMAPConnection._logger.debug("Connecting to IMAP server " \
+ self.__logger.debug("Connecting to IMAP server " \
+ self.login + "@" + self.host + ":" + str(self.port) \
+ " (" + self.mailbox + "). SSL=" \
+ str(self.ssl))
@@ -366,12 +377,12 @@ class IMAPConnection(MailConnection):
self.connection.login(self.login, self.password)
def disconnect(self):
- IMAPConnection._logger.debug("Disconnecting from IMAP server " \
+ self.__logger.debug("Disconnecting from IMAP server " \
+ self.host)
self.connection.logout()
def get_mail_list(self):
- IMAPConnection._logger.debug("Getting mail list")
+ self.__logger.debug("Getting mail list")
typ, data = self.connection.select(self.mailbox)
typ, data = self.connection.search(None, 'RECENT')
if typ == 'OK':
@@ -379,7 +390,7 @@ class IMAPConnection(MailConnection):
return None
def get_mail(self, index):
- IMAPConnection._logger.debug("Getting mail " + str(index))
+ self.__logger.debug("Getting mail " + str(index))
typ, data = self.connection.select(self.mailbox, True)
typ, data = self.connection.fetch(index, '(RFC822)')
if typ == 'OK':
@@ -387,7 +398,7 @@ class IMAPConnection(MailConnection):
return u"Error while fetching mail " + str(index)
def get_mail_summary(self, index):
- IMAPConnection._logger.debug("Getting mail summary " + str(index))
+ self.__logger.debug("Getting mail summary " + str(index))
typ, data = self.connection.select(self.mailbox, True)
typ, data = self.connection.fetch(index, '(RFC822)')
if typ == 'OK':
@@ -404,29 +415,25 @@ class IMAPConnection(MailConnection):
type = property(get_type)
-class POP3Connection(MailConnection):
- _logger = logging.getLogger("jmc.POP3Connection")
-
- def __init__(self, login = "", password = "", host = "", \
- port = None, ssl = False):
- if not port:
- if ssl:
- port = 995
- else:
- port = 110
- MailConnection.__init__(self, login, password, host, port, ssl)
- self.__nb_mail = 0
- self.__lastmail = 0
+class POP3Account(MailAccount):
+ nb_mail = IntCol(default = 0)
+ lastmail = IntCol(default = 0)
+
+ def _init(self, *args, **kw):
+ MailAccount._init(self, *args, **kw)
+ self.__logger = logging.getLogger("jmc.model.account.POP3Account")
def get_type(self):
if self.ssl:
return "pop3s"
return "pop3"
+ type = property(get_type)
+
def connect(self):
- POP3Connection._logger.debug("Connecting to POP3 server " \
- + self.login + "@" + self.host + ":" + str(self.port)\
- + ". SSL=" + str(self.ssl))
+ self.__logger.debug("Connecting to POP3 server " \
+ + self.login + "@" + self.host + ":" + str(self.port)\
+ + ". SSL=" + str(self.ssl))
if self.ssl:
self.connection = MYPOP3_SSL(self.host, self.port)
else:
@@ -439,18 +446,18 @@ class POP3Connection(MailConnection):
def disconnect(self):
- POP3Connection._logger.debug("Disconnecting from POP3 server " \
- + self.host)
+ self.__logger.debug("Disconnecting from POP3 server " \
+ + self.host)
self.connection.quit()
def get_mail_list(self):
- POP3Connection._logger.debug("Getting mail list")
+ self.__logger.debug("Getting mail list")
count, size = self.connection.stat()
- self.__nb_mail = count
+ self.nb_mail = count
return [str(i) for i in range(1, count + 1)]
def get_mail(self, index):
- POP3Connection._logger.debug("Getting mail " + str(index))
+ self.__logger.debug("Getting mail " + str(index))
ret, data, size = self.connection.retr(index)
try:
self.connection.rset()
@@ -461,7 +468,7 @@ class POP3Connection(MailConnection):
return u"Error while fetching mail " + str(index)
def get_mail_summary(self, index):
- POP3Connection._logger.debug("Getting mail summary " + str(index))
+ self.__logger.debug("Getting mail summary " + str(index))
ret, data, size = self.connection.retr(index)
try:
self.connection.rset()
@@ -472,16 +479,15 @@ class POP3Connection(MailConnection):
return u"Error while fetching mail " + str(index)
def get_next_mail_index(self, mail_list):
- if self.__nb_mail == self.__lastmail:
+ if self.nb_mail == self.lastmail:
return None
- if self.__nb_mail < self.__lastmail:
- self.__lastmail = 0
- result = int(mail_list[self.__lastmail])
- self.__lastmail += 1
+ if self.nb_mail < self.lastmail:
+ self.lastmail = 0
+ result = int(mail_list[self.lastmail])
+ self.lastmail += 1
return result
def mark_all_as_read(self):
self.get_mail_list()
- self.__lastmail = self.__nb_mail
+ self.lastmail = self.nb_mail
- type = property(get_type)
diff --git a/src/jmc/utils/__init__.py b/src/jmc/utils/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/src/jmc/utils/storage.py b/src/jmc/utils/storage.py
deleted file mode 100644
index f12a3fd..0000000
--- a/src/jmc/utils/storage.py
+++ /dev/null
@@ -1,297 +0,0 @@
-##
-## storage.py
-## Login : David Rousselie
-## Started on Wed Jul 20 20:26:53 2005 dax
-## $Id: storage.py,v 1.1 2005/09/18 20:24:07 dax Exp $
-##
-## Copyright (C) 2005 dax
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-##
-
-import os
-import re
-import os.path
-import sys
-import anydbm
-import logging
-from UserDict import UserDict
-
-import jmc.email.mailconnection_factory as mailconnection_factory
-
-
-class Storage(UserDict):
- def __init__(self, nb_pk_fields = 1, spool_dir = ".", db_file = None):
- UserDict.__init__(self)
- self.nb_pk_fields = nb_pk_fields
- if db_file is None:
- self._spool_dir = ""
- self.set_spool_dir(spool_dir)
- self.file = self._spool_dir + "/registered.db"
- else:
- spool_dir = os.path.dirname(db_file) or "."
- self.set_spool_dir(spool_dir)
- self.file = db_file
- self._registered = self.load()
-
- def __setitem__(self, pk_tuple, obj):
-# print "Adding " + "#".join(map(str, pk_tuple)) + " = " + str(obj)
- self._registered[str("#".join(map(str, pk_tuple)))] = obj
-
- def __getitem__(self, pk_tuple):
-# print "Getting " + "#".join(map(str, pk_tuple))
- if len(pk_tuple) == self.nb_pk_fields:
- return self._registered[str("#".join(map(str, pk_tuple)))]
- else:
- partial_key = str("#".join(map(str, pk_tuple)))
- regexp = re.compile(partial_key)
- return [self._registered[key]
- for key in self._registered.keys()
- if regexp.search(key)]
-
- def __delitem__(self, pk_tuple):
- #print "Deleting " + "#".join(map(str, pk_tuple))
- del self._registered[str("#".join(map(str, pk_tuple)))]
-
- def get_spool_dir(self):
- return self._spool_dir
-
- def set_spool_dir(self, spool_dir):
- self._spool_dir = spool_dir
- if not os.path.isdir(self._spool_dir):
- os.makedirs(self._spool_dir)
-
- spool_dir = property(get_spool_dir, set_spool_dir)
-
- def has_key(self, pk_tuple):
- if len(pk_tuple) == self.nb_pk_fields:
- return self._registered.has_key(str("#".join(map(str, pk_tuple))))
- else:
- partial_key = str("#".join(map(str, pk_tuple)))
- regexp = re.compile("^" + partial_key)
- for key in self._registered.keys():
- if regexp.search(key):
- return True
- return False
-
- def keys(self, pk_tuple = None):
- if pk_tuple is None:
- return [tuple(key.split("#")) for key in self._registered.keys()]
- else:
- level = len(pk_tuple)
- partial_key = str("#".join(map(str, pk_tuple)))
- regexp = re.compile("^" + partial_key)
- result = {}
- for key in self._registered.keys():
- if regexp.search(key):
- result[key.split("#")[level]] = None
- return result.keys()
-
- def dump(self):
- for pk in self._registered.keys():
- print pk + " = " + str(self._registered[pk])
-
- def load(self):
- pass
-
-class DBMStorage(Storage):
- _logger = logging.getLogger("jmc.utils.DBMStorage")
-
- def __init__(self, nb_pk_fields = 1, spool_dir = ".", db_file = None):
-# print "DBM INIT"
- Storage.__init__(self, nb_pk_fields, spool_dir, db_file)
-
- def __del__(self):
- # print "DBM STOP"
- self.sync()
-
- def load(self):
- str_registered = anydbm.open(self.file, \
- 'c')
- result = {}
- try:
- for pk in str_registered.keys():
- result[pk] = mailconnection_factory.str_to_mail_connection(str_registered[pk])
- except Exception, e:
- print >>sys.stderr, "Cannot load registered.db : "
- print >>sys.stderr, e
- str_registered.close()
- return result
-
- def sync(self):
- #print "DBM SYNC"
- self.store()
-
- def __store(self, nb_pk_fields, registered, pk):
- if nb_pk_fields > 0:
- for key in registered.keys():
- if pk:
- self.__store(nb_pk_fields - 1, \
- registered[key], pk + "#" + key)
- else:
- self.__store(nb_pk_fields - 1, \
- registered[key], key)
- else:
- self.__str_registered[pk] = str(registered)
-# print "STORING : " + pk + " = " + str(registered)
-
- def store(self):
-# print "DBM STORE"
- try:
- str_registered = anydbm.open(self.file, \
- 'n')
- for pk in self._registered.keys():
- str_registered[pk] = str(self._registered[pk])
- except Exception, e:
- print >>sys.stderr, "Cannot save to registered.db : "
- print >>sys.stderr, e
- str_registered.close()
-
- def __setitem__(self, pk_tuple, obj):
- Storage.__setitem__(self, pk_tuple, obj)
- self.sync()
-
- def __delitem__(self, pk_tuple):
- Storage.__delitem__(self, pk_tuple)
- self.sync()
-
-# Do not fail if pysqlite is not installed
-try:
- from pysqlite2 import dbapi2 as sqlite
-
- class SQLiteStorage(Storage):
- _logger = logging.getLogger("jmc.utils.SQLiteStorage")
-
- def __init__(self, nb_pk_fields = 1, spool_dir = ".", db_file = None):
- self.__connection = None
- Storage.__init__(self, nb_pk_fields, spool_dir, db_file)
-
- def create(self):
- SQLiteStorage._logger.debug("creating new Table")
- cursor = self.__connection.cursor()
- cursor.execute("""
- create table account(
- jid STRING,
- name STRING,
- type STRING,
- login STRING,
- password STRING,
- host STRING,
- port INTEGER,
- chat_action INTEGER,
- online_action INTEGER,
- away_action INTEGER,
- xa_action INTEGER,
- dnd_action INTEGER,
- offline_action INTEGER,
- interval INTEGER,
- live_email_only BOOLEAN,
- mailbox STRING,
- PRIMARY KEY(jid, name)
- )
- """)
- self.__connection.commit()
- cursor.close()
-
- def __del__(self):
- self.__connection.close()
-
- def sync(self):
- pass
-
- def load(self):
- if not os.path.exists(self.file):
- self.__connection = sqlite.connect(self.file)
- self.create()
- else:
- self.__connection = sqlite.connect(self.file)
- cursor = self.__connection.cursor()
- cursor.execute("""select * from account""")
- result = {}
- for row in cursor.fetchall():
- # print "Creating new " + row[self.nb_pk_fields] + " connection."
- account_type = row[self.nb_pk_fields]
- account = result["#".join(row[0:self.nb_pk_fields])] = mailconnection_factory.get_new_mail_connection(account_type)
- account.login = row[self.nb_pk_fields + 1]
- account.password = row[self.nb_pk_fields + 2]
- if account.password is None:
- account.store_password = False
- else:
- account.store_password = True
- account.host = row[self.nb_pk_fields + 3]
- account.port = int(row[self.nb_pk_fields + 4])
- account.chat_action = int(row[self.nb_pk_fields + 5])
- account.online_action = int(row[self.nb_pk_fields + 6])
- account.away_action = int(row[self.nb_pk_fields + 7])
- account.xa_action = int(row[self.nb_pk_fields + 8])
- account.dnd_action = int(row[self.nb_pk_fields + 9])
- account.offline_action = int(row[self.nb_pk_fields + 10])
- account.interval = int(row[self.nb_pk_fields + 11])
- account.live_email_only = (row[self.nb_pk_fields + 12] == 1)
- if account_type[0:4] == "imap":
- account.mailbox = row[self.nb_pk_fields + 13]
- # for field_index in range(self.nb_pk_fields + 1, len(row)):
- # print "\tSetting " + str(cursor.description[field_index][0]) + \
- # " to " + str(row[field_index])
- # setattr(account,
- # cursor.description[field_index][0],
- # row[field_index])
- cursor.close()
- return result
-
- def __setitem__(self, pk_tuple, obj):
- Storage.__setitem__(self, pk_tuple, obj)
- cursor = self.__connection.cursor()
- mailbox = None
- password = None
- if obj.type[0:4] == "imap":
- mailbox = obj.mailbox
- if obj.store_password == True:
- password = obj.password
- cursor.execute("""
- insert or replace into account values
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (pk_tuple[0],
- pk_tuple[1],
- obj.type,
- obj.login,
- password,
- obj.host,
- obj.port,
- obj.chat_action,
- obj.online_action,
- obj.away_action,
- obj.xa_action,
- obj.dnd_action,
- obj.offline_action,
- obj.interval,
- obj.live_email_only,
- mailbox))
- self.__connection.commit()
- cursor.close()
-
- def __delitem__(self, pk_tuple):
- Storage.__delitem__(self, pk_tuple)
- cursor = self.__connection.cursor()
- cursor.execute("""
- delete from account where jid = ? and name = ?
- """,
- (pk_tuple[0],
- pk_tuple[1]))
- self.__connection.commit()
- cursor.close()
-except ImportError:
- pass
-
diff --git a/tests/jmc-test.xml b/tests/jmc-test.xml
deleted file mode 100644
index c90e6a9..0000000
--- a/tests/jmc-test.xml
+++ /dev/null
@@ -1,20 +0,0 @@
-
-
- 127.0.0.1
- 5347
- secret
- jmc.localhost
- 5
- en
-
- Jabber Mail Component
- A Jabber mail server component
- http://people.happycoders.org/dax/jabber/jmc/
-
-
- SQLite
- jmc.pid
- .
- 5
- iso-8859-15
-
diff --git a/tests/jmc/__init__.py b/tests/jmc/__init__.py
new file mode 100644
index 0000000..d424a64
--- /dev/null
+++ b/tests/jmc/__init__.py
@@ -0,0 +1,2 @@
+"""JMC test module"""
+__revision__ = ""
diff --git a/tests/dummy_server.py b/tests/jmc/dummy_server.py
similarity index 82%
rename from tests/dummy_server.py
rename to tests/jmc/dummy_server.py
index 4d9dca7..7f48481 100644
--- a/tests/dummy_server.py
+++ b/tests/jmc/dummy_server.py
@@ -29,9 +29,28 @@ import socket
import types
import select
import xml.dom.minidom
-import utils
from pyxmpp import xmlextra
+def xmldiff(node1, node2):
+ if node1.nodeType == node1.TEXT_NODE:
+ if not node2.nodeType == node2.TEXT_NODE \
+ or re.compile(node2.data + "$").match(node1.data) is None:
+ raise Exception("data in text node " + node1.data + " does not match " + node2.data)
+ elif node1.nodeType == node1.DOCUMENT_NODE:
+ if not node2.nodeType == node2.DOCUMENT_NODE:
+ raise Exception("node1 is Document but not node2 (" + node2.nodeType + ")")
+ elif node1.tagName != node2.tagName:
+ raise Exception("Different tag name : " + node1.tagName + " != " + node2.tagName)
+ else:
+ for attr in node1._get_attributes().keys():
+ if not node2.hasAttribute(attr) \
+ or node1.getAttribute(attr) != node2.getAttribute(attr):
+ raise Exception("(" + node1.tagName + ") Different attributes : " + node1.getAttribute(attr) + " != " + node2.getAttribute(attr))
+ if len(node1.childNodes) != len(node2.childNodes):
+ raise Exception("(" + node1.tagName + ") Different children number : " + str(len(node1.childNodes)) + " != " + str(len(node2.childNodes)))
+ for i in range(len(node1.childNodes)):
+ xmldiff(node1.childNodes[i], node2.childNodes[i])
+
class DummyServer:
def __init__(self, host, port, responses = None):
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
@@ -40,7 +59,7 @@ class DummyServer:
s = socket.socket(af, socktype, proto)
except socket.error, msg:
s = None
- continue
+ raise socket.error
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(sa)
@@ -48,7 +67,7 @@ class DummyServer:
except socket.error, msg:
s.close()
s = None
- continue
+ raise socket.error
break
self.socket = s
self.responses = None
@@ -56,6 +75,7 @@ class DummyServer:
self.real_queries = []
def serve(self):
+ conn = None
try:
conn, addr = self.socket.accept()
rfile = conn.makefile('rb', -1)
@@ -77,9 +97,11 @@ class DummyServer:
else:
self.real_queries.append(data)
#print >>sys.stderr, 'Receive : ', data
- conn.close()
- self.socket.close()
- self.socket = None
+ if conn is not None:
+ conn.close()
+ if self.socket is not None:
+ self.socket.close()
+ self.socket = None
except:
type, value, stack = sys.exc_info()
print >>sys.stderr, "".join(traceback.format_exception
diff --git a/tests/email_generator.py b/tests/jmc/email_generator.py
similarity index 100%
rename from tests/email_generator.py
rename to tests/jmc/email_generator.py
diff --git a/tests/jmc/jabber/__init__.py b/tests/jmc/jabber/__init__.py
new file mode 100644
index 0000000..9ef7a58
--- /dev/null
+++ b/tests/jmc/jabber/__init__.py
@@ -0,0 +1,2 @@
+"""JMC jabber test module"""
+__revision__ = ""
diff --git a/src/jmc/utils/release.py b/tests/jmc/jabber/test_component.py
similarity index 62%
rename from src/jmc/utils/release.py
rename to tests/jmc/jabber/test_component.py
index b8daeeb..118ce52 100644
--- a/src/jmc/utils/release.py
+++ b/tests/jmc/jabber/test_component.py
@@ -1,10 +1,10 @@
##
-## release.py
-## Login : David Rousselie
-## Started on Mon Jul 24 22:37:00 2006 dax
+## test_component.py
+## Login :
+## Started on Wed Feb 14 18:04:49 2007 David Rousselie
## $Id$
##
-## Copyright (C) 2006 dax
+## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
@@ -20,14 +20,7 @@
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
-version = "0.2.2"
-author = "David Rousselie"
-email = "dax@happycoders.org"
-license = "GPL"
-long_description = """Jabber Mail Component
+import unittest
-JMC is a jabber service to check email from POP3 and IMAP4 server and retrieve
-them or just a notification of new emails. Jabber users can register multiple
-email accounts.
-
-"""
+class MailComponent_TestCase(unittest.TestCase):
+ pass
diff --git a/src/jmc/email/__init__.py b/tests/jmc/model/__init__.py
similarity index 100%
rename from src/jmc/email/__init__.py
rename to tests/jmc/model/__init__.py
diff --git a/tests/test_mailconnection.py b/tests/jmc/model/test_account.py
similarity index 65%
rename from tests/test_mailconnection.py
rename to tests/jmc/model/test_account.py
index 599d997..2f7d4c3 100644
--- a/tests/test_mailconnection.py
+++ b/tests/jmc/model/test_account.py
@@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-
##
-## mailconnection_test.py
-## Login : David Rousselie
-## Started on Fri May 13 11:32:51 2005 David Rousselie
-## $Id: test_mailconnection.py,v 1.2 2005/09/18 20:24:07 David Rousselie Exp $
+## test_account.py
+## Login :
+## Started on Wed Feb 14 08:23:17 2007 David Rousselie
+## $Id$
##
-## Copyright (C) 2005 David Rousselie
+## Copyright (C) 2007 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
@@ -22,20 +22,45 @@
##
import unittest
-from jmc.email.mailconnection import IMAPConnection, \
- POP3Connection, \
- MailConnection
-import dummy_server
-import email_generator
+import os
import thread
-import re
-import sys
-import string
-class MailConnection_TestCase(unittest.TestCase):
+from sqlobject import *
+from sqlobject.dbconnection import TheURIOpener
+
+from jcl.model import account
+from jcl.model.account import Account, PresenceAccount
+from jmc.model.account import MailAccount, POP3Account, IMAPAccount
+
+from tests.jmc import email_generator, dummy_server
+
+DB_PATH = "/tmp/jmc_test.db"
+DB_URL = DB_PATH # + "?debug=1&debugThreading=1"
+
+class MailAccount_TestCase(unittest.TestCase):
def setUp(self):
- self.connection = MailConnection()
-
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ Account.createTable(ifNotExists = True)
+ PresenceAccount.createTable(ifNotExists = True)
+ MailAccount.createTable(ifNotExists = True)
+ self.account = MailAccount(user_jid = "user1@test.com", \
+ name = "account1", \
+ jid = "account1@jmc.test.com")
+ del account.hub.threadConnection
+
+ def tearDown(self):
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ MailAccount.dropTable(ifExists = True)
+ PresenceAccount.dropTable(ifExists = True)
+ Account.dropTable(ifExists = True)
+ del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
+ account.hub.threadConnection.close()
+ del account.hub.threadConnection
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+
def make_test(email_type, tested_func, expected_res):
def inner(self):
encoded, multipart, header = email_type
@@ -48,23 +73,23 @@ class MailConnection_TestCase(unittest.TestCase):
test_get_decoded_part_not_encoded = \
make_test((False, False, False), \
- lambda self, email: self.connection.get_decoded_part(email, None), \
+ lambda self, email: self.account.get_decoded_part(email, None), \
u"Not encoded single part")
test_get_decoded_part_encoded = \
make_test((True, False, False), \
- lambda self, email: self.connection.get_decoded_part(email, None), \
+ lambda self, email: self.account.get_decoded_part(email, None), \
u"Encoded single part with 'iso-8859-15' charset (éàê)")
test_format_message_summary_not_encoded = \
make_test((False, False, True), \
- lambda self, email: self.connection.format_message_summary(email), \
+ lambda self, email: self.account.format_message_summary(email), \
(u"From : not encoded from\nSubject : not encoded subject\n\n", \
u"not encoded from"))
test_format_message_summary_encoded = \
make_test((True, False, True), \
- lambda self, email: self.connection.format_message_summary(email), \
+ lambda self, email: self.account.format_message_summary(email), \
(u"From : encoded from (éàê)\nSubject : encoded subject " + \
u"(éàê)\n\n", \
u"encoded from (éàê)"))
@@ -78,21 +103,21 @@ class MailConnection_TestCase(unittest.TestCase):
email.replace_header("From", \
"\"" + str(email["From"]) \
+ "\" not encoded part") or \
- self.connection.format_message_summary(email), \
+ self.account.format_message_summary(email), \
(u"From : \"encoded from (éàê)\" not encoded part\nSubject " + \
u": \"encoded subject (éàê)\" not encoded part\n\n", \
u"\"encoded from (éàê)\" not encoded part"))
test_format_message_single_not_encoded = \
make_test((False, False, True), \
- lambda self, email: self.connection.format_message(email), \
+ lambda self, email: self.account.format_message(email), \
(u"From : not encoded from\nSubject : not encoded subject" + \
u"\n\nNot encoded single part\n", \
u"not encoded from"))
test_format_message_single_encoded = \
make_test((True, False, True), \
- lambda self, email: self.connection.format_message(email), \
+ lambda self, email: self.account.format_message(email), \
(u"From : encoded from (éàê)\nSubject : encoded subject " + \
u"(éàê)\n\nEncoded single part with 'iso-8859-15' charset" + \
u" (éàê)\n", \
@@ -100,14 +125,14 @@ class MailConnection_TestCase(unittest.TestCase):
test_format_message_multi_not_encoded = \
make_test((False, True, True), \
- lambda self, email: self.connection.format_message(email), \
+ lambda self, email: self.account.format_message(email), \
(u"From : not encoded from\nSubject : not encoded subject" + \
u"\n\nNot encoded multipart1\nNot encoded multipart2\n", \
u"not encoded from"))
test_format_message_multi_encoded = \
make_test((True, True, True), \
- lambda self, email: self.connection.format_message(email), \
+ lambda self, email: self.account.format_message(email), \
(u"From : encoded from (éàê)\nSubject : encoded subject (éà" + \
u"ê)\n\nutf-8 multipart1 with no charset (éàê)" + \
u"\nEncoded multipart2 with 'iso-8859-15' charset (éàê)\n" + \
@@ -115,19 +140,40 @@ class MailConnection_TestCase(unittest.TestCase):
u"encoded from (éàê)"))
-class POP3Connection_TestCase(unittest.TestCase):
+class POP3Account_TestCase(unittest.TestCase):
def setUp(self):
self.server = dummy_server.DummyServer("localhost", 1110)
thread.start_new_thread(self.server.serve, ())
- self.pop3connection = POP3Connection("login", \
- "pass", \
- "localhost", \
- 1110, \
- ssl = False)
-
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ Account.createTable(ifNotExists = True)
+ PresenceAccount.createTable(ifNotExists = True)
+ MailAccount.createTable(ifNotExists = True)
+ POP3Account.createTable(ifNotExists = True)
+ self.pop3_account = POP3Account(user_jid = "user1@test.com", \
+ name = "account1", \
+ jid = "account1@jmc.test.com", \
+ login = "login")
+ self.pop3_account.password = "pass"
+ self.pop3_account.host = "localhost"
+ self.pop3_account.port = 1110
+ self.pop3_account.ssl = False
+ del account.hub.threadConnection
+
def tearDown(self):
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ POP3Account.dropTable(ifExists = True)
+ MailAccount.dropTable(ifExists = True)
+ PresenceAccount.dropTable(ifExists = True)
+ Account.dropTable(ifExists = True)
+ del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
+ account.hub.threadConnection.close()
+ del account.hub.threadConnection
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
self.server = None
- self.pop3connection = None
+ self.pop3_account = None
def make_test(responses = None, queries = None, core = None):
def inner(self):
@@ -141,12 +187,14 @@ class POP3Connection_TestCase(unittest.TestCase):
if queries:
self.server.queries += queries
self.server.queries += ["QUIT\r\n"]
- self.pop3connection.connect()
- self.failUnless(self.pop3connection.connection, \
+ self.pop3_account.connect()
+ self.failUnless(self.pop3_account.connection, \
"Cannot establish connection")
if core:
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
core(self)
- self.pop3connection.disconnect()
+ del account.hub.threadConnection
+ self.pop3_account.disconnect()
self.failUnless(self.server.verify_queries(), \
"Sended queries does not match expected queries.")
return inner
@@ -157,7 +205,7 @@ class POP3Connection_TestCase(unittest.TestCase):
make_test(["+OK 2 20\r\n"], \
["STAT\r\n"], \
lambda self: \
- self.assertEquals(self.pop3connection.get_mail_list(), \
+ self.assertEquals(self.pop3_account.get_mail_list(), \
["1", "2"]))
test_get_mail_summary = \
@@ -169,7 +217,7 @@ class POP3Connection_TestCase(unittest.TestCase):
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
- self.assertEquals(self.pop3connection.get_mail_summary(1), \
+ self.assertEquals(self.pop3_account.get_mail_summary(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n", \
u"user@test.com")))
@@ -183,7 +231,7 @@ class POP3Connection_TestCase(unittest.TestCase):
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
- self.assertEquals(self.pop3connection.get_mail(1), \
+ self.assertEquals(self.pop3_account.get_mail(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n" + \
u"mymessage\n", \
@@ -198,7 +246,7 @@ class POP3Connection_TestCase(unittest.TestCase):
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
- self.assertEquals(self.pop3connection.get_mail_summary(1), \
+ self.assertEquals(self.pop3_account.get_mail_summary(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n", \
u"user@test.com")))
@@ -212,26 +260,47 @@ class POP3Connection_TestCase(unittest.TestCase):
["RETR 1\r\n",
"RSET\r\n"], \
lambda self: \
- self.assertEquals(self.pop3connection.get_mail(1), \
+ self.assertEquals(self.pop3_account.get_mail(1), \
(u"From : user@test.com\n" + \
u"Subject : subject test\n\n" + \
u"mymessage\n", \
u"user@test.com")))
-class IMAPConnection_TestCase(unittest.TestCase):
+class IMAPAccount_TestCase(unittest.TestCase):
def setUp(self):
self.server = dummy_server.DummyServer("localhost", 1143)
thread.start_new_thread(self.server.serve, ())
- self.imap_connection = IMAPConnection("login", \
- "pass", \
- "localhost", \
- 1143, \
- ssl = False)
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ Account.createTable(ifNotExists = True)
+ PresenceAccount.createTable(ifNotExists = True)
+ MailAccount.createTable(ifNotExists = True)
+ IMAPAccount.createTable(ifNotExists = True)
+ self.imap_account = IMAPAccount(user_jid = "user1@test.com", \
+ name = "account1", \
+ jid = "account1@jmc.test.com", \
+ login = "login")
+ self.imap_account.password = "pass"
+ self.imap_account.host = "localhost"
+ self.imap_account.port = 1143
+ self.imap_account.ssl = False
+ del account.hub.threadConnection
def tearDown(self):
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ IMAPAccount.dropTable(ifExists = True)
+ MailAccount.dropTable(ifExists = True)
+ PresenceAccount.dropTable(ifExists = True)
+ Account.dropTable(ifExists = True)
+ del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
+ account.hub.threadConnection.close()
+ del account.hub.threadConnection
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
self.server = None
- self.imap_connection = None
+ self.imap_account = None
def make_test(responses = None, queries = None, core = None):
def inner(self):
@@ -250,12 +319,14 @@ class IMAPConnection_TestCase(unittest.TestCase):
if queries:
self.server.queries += queries
self.server.queries += ["^[^ ]* LOGOUT"]
- self.imap_connection.connect()
- self.failUnless(self.imap_connection.connection, \
+ self.imap_account.connect()
+ self.failUnless(self.imap_account.connection, \
"Cannot establish connection")
if core:
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
core(self)
- self.imap_connection.disconnect()
+ del account.hub.threadConnection
+ self.imap_account.disconnect()
self.failUnless(self.server.verify_queries())
return inner
@@ -271,7 +342,7 @@ class IMAPConnection_TestCase(unittest.TestCase):
["^[^ ]* SELECT INBOX", \
"^[^ ]* SEARCH RECENT"], \
lambda self: \
- self.assertEquals(self.imap_connection.get_mail_list(), ['9', '10']))
+ self.assertEquals(self.imap_account.get_mail_list(), ['9', '10']))
test_get_mail_summary = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
" [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
@@ -283,7 +354,7 @@ class IMAPConnection_TestCase(unittest.TestCase):
data.split()[0] + " OK FETCH completed\r\n"], \
["^[^ ]* EXAMINE INBOX", \
"^[^ ]* FETCH 1 \(RFC822\)"], \
- lambda self: self.assertEquals(self.imap_connection.get_mail_summary(1), \
+ lambda self: self.assertEquals(self.imap_account.get_mail_summary(1), \
(u"From : None\nSubject : None\n\n", \
u"None")))
@@ -297,7 +368,7 @@ class IMAPConnection_TestCase(unittest.TestCase):
data.split()[0] + " OK FETCH completed\r\n"], \
["^[^ ]* EXAMINE INBOX", \
"^[^ ]* FETCH 1 \(RFC822\)",], \
- lambda self: self.assertEquals(self.imap_connection.get_mail(1), \
+ lambda self: self.assertEquals(self.imap_account.get_mail(1), \
(u"From : None\nSubject : None\n\nbody text\r\n\n", \
u"None")))
diff --git a/tests/test_lang.py b/tests/jmc/test_lang.py
similarity index 100%
rename from tests/test_lang.py
rename to tests/jmc/test_lang.py
diff --git a/tests/test_component.py b/tests/test_component.py
deleted file mode 100644
index 3ec2f40..0000000
--- a/tests/test_component.py
+++ /dev/null
@@ -1,355 +0,0 @@
-##
-## test_mailconnection_factory.py
-## Login : David Rousselie
-## Started on Fri May 20 10:46:58 2005
-## $Id: test_component.py,v 1.2 2005/09/18 20:24:07 dax Exp $
-##
-## Copyright (C) 2005
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-##
-
-import thread
-import unittest
-import dummy_server
-import time
-import traceback
-from pyxmpp import xmlextra
-from jmc.jabber.component import *
-from jmc.utils.config import Config
-
-class TestStreamHandler(xmlextra.StreamHandler):
- def __init__(self, expected_balises = []):
- xmlextra.StreamHandler.__init__(self)
- self.expected_balises = expected_balises
-
- def stream_start(self, doc):
- pass
-
- def stream_end(self, doc):
- pass
-
- def stanza(self, notused, node):
- pass
-
-class MailComponent_TestCase_NoConnection(unittest.TestCase):
- def setUp(self):
- self.mail_component = MailComponent(Config("tests/jmc-test.xml"))
-
- def tearDown(self):
- self.mail_component = None
-
- def test_get_reg_form(self):
- reg_form = self.mail_component.get_reg_form()
- reg_form2 = self.mail_component.get_reg_form()
- self.assertTrue(reg_form is reg_form2)
-
-#TODO
-class MailComponent_TestCase_Basic(unittest.TestCase):
- def setUp(self):
- self.handler = TestStreamHandler()
- self.mail_component = MailComponent(Config("tests/jmc-test.xml"))
- self.server = dummy_server.XMLDummyServer("localhost", 55555, None, self.handler)
- thread.start_new_thread(self.server.serve, ())
-
- def tearDown(self):
- self.server = None
- self.mail_component = None
- os.remove("./registered.db")
-
- def test_run(self):
- self.server.responses = ["", \
- ""]
- self.server.queries = ["", \
- "[0-9abcdef]*",
- ""]
- self.mail_component.run(1)
- self.failUnless(self.server.verify_queries())
- # TODO : more assertion
-
-class MailComponent_TestCase_NoReg(unittest.TestCase):
- def setUp(self):
- self.handler = TestStreamHandler()
- self.mail_component = MailComponent(Config("tests/jmc-test.xml"))
- self.server = dummy_server.XMLDummyServer("localhost", 55555, None, self.handler)
- thread.start_new_thread(self.server.serve, ())
-
- def tearDown(self):
- self.server = None
- self.mail_component = None
- ## TODO : be storage independant
- os.remove("./registered.db")
-
- def test_disco_get_items(self):
- self.server.responses = ["",
- "",
- ""]
- self.server.queries = ["" + \
- "", \
- "[0-9abcdef]*",
- "",
- ""]
- self.mail_component.run(1)
- self.failUnless(self.server.verify_queries())
-
- def test_get_register(self):
- self.server.responses = ["",
- "",
- ""]
- self.server.queries = ["" + \
- "", \
- "[0-9abcdef]*",
- "" + \
- "" + \
- "" + \
- "Jabber Mail connection registration" + \
- "Enter anything below" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "INBOX" + \
- "" + \
- "" + \
- "2" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "2" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "1" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "1" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "1" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "0" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "5" + \
- "" + \
- "" + \
- "" + \
- "",
- ""]
- self.mail_component.run(1)
- self.failUnless(self.server.verify_queries())
-
-
- def test_disco_get_info(self):
- self.server.responses = ["",
- "",
- ""]
- self.server.queries = ["" + \
- "",
- "[0-9abcdef]*",
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "",
- ""]
- self.mail_component.run(1)
- self.failUnless(self.server.verify_queries())
-
- def test_set_register(self):
- self.server.responses = ["", \
- "" + \
- "" + \
- "" + \
- "" + \
- "" + \
- "test" + \
- "" + \
- "" + \
- "logintest" + \
- "" + \
- "" + \
- "passtest" + \
- "" + \
- "" + \
- "hosttest" + \
- "" + \
- "" + \
- "993" + \
- "" + \
- "" + \
- "imaps" + \
- "" + \
- "" + \
- "INBOX" + \
- "" + \
- "" + \
- "2" + \
- "" + \
- "" + \
- "2" + \
- "" + \
- "" + \
- "1" + \
- "" + \
- "" + \
- "1" + \
- "" + \
- "" + \
- "1" + \
- "" + \
- "" + \
- "0" + \
- "" + \
- "" + \
- "5" + \
- "" + \
- "" + \
- "",
- lambda x: None,
- ""]
- self.server.queries = ["" + \
- "", \
- "[0-9abcdef]*", \
- "",
- "",
- "New imaps connection \\'test\\': Registered with username \\'logintest\\' and password \\'passtest\\' on \\'hosttest:993\\'",
- "",
- ""]
- self.mail_component.run(1)
- self.failUnless(self.server.verify_queries())
-
-class MailComponent_TestCase_Reg(unittest.TestCase):
- def setUp(self):
- self.mail_component = MailComponent(Config("tests/jmc-test.xml"))
- self.mail_component.set_register(iq = None)
-
- def test_get_reg_form_init(self):
- pass
-
- def test_get_reg_form_init_2pass(self):
- pass
-
- def test_disco_get_items(self):
- pass
-
- def test_get_register(self):
- pass
-
- def test_set_register_update(self):
- pass
-
- def test_set_register_remove(self):
- pass
-
- def test_presence_available_transport(self):
- pass
-
- def test_presence_available_mailbox(self):
- pass
-
- def test_presence_unavailable_transport(self):
- pass
-
- def test_presence_unavailable_mailbox(self):
- pass
-
- def test_presence_subscribe(self):
- pass
-
- def test_presence_subscribed_transport(self):
- pass
-
- def test_presence_subscribed_mailbox(self):
- pass
-
- def test_presence_unsubscribe(self):
- pass
-
- def test_presence_unsubscribed(self):
- pass
-
- def test_check_mail(self):
- pass
-
diff --git a/tests/test_mailconnection_factory.py b/tests/test_mailconnection_factory.py
deleted file mode 100644
index 738374e..0000000
--- a/tests/test_mailconnection_factory.py
+++ /dev/null
@@ -1,204 +0,0 @@
-##
-## test_mailconnection_factory.py
-## Login : David Rousselie
-## Started on Fri May 20 10:46:58 2005
-## $Id: test_mailconnection_factory.py,v 1.1 2005/07/11 20:39:31 dax Exp $
-##
-## Copyright (C) 2005
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-##
-
-import unittest
-from jmc.email.mailconnection_factory import *
-from jmc.email.mailconnection import *
-import jmc.email.mailconnection as mailconnection
-
-class MailConnectionFactory_TestCase(unittest.TestCase):
- def test_new_mail_connection_imap(self):
- mc = get_new_mail_connection("imap")
- self.assertEquals(mc, IMAPConnection())
-
- def test_new_mail_connection_imaps(self):
- mc = get_new_mail_connection("imaps")
- self.assertEquals(mc, IMAPConnection(ssl = True))
-
- def test_new_mail_connection_pop3(self):
- mc = get_new_mail_connection("pop3")
- self.assertEquals(mc, POP3Connection())
-
- def test_new_mail_connection_pop3s(self):
- mc = get_new_mail_connection("pop3s")
- self.assertEquals(mc, POP3Connection(ssl = True))
-
- def test_new_mail_connection_unknown(self):
- self.assertRaises(Exception, get_new_mail_connection, "unknown")
-
- def test_str_to_mail_connection_imap_v01_v02(self):
- mc = str_to_mail_connection("imap#login#passwd#host#193#False#INBOX")
- self.assertEquals(mc.get_type(), "imap")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, "passwd")
- self.assertEquals(mc.store_password, True)
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 193)
- self.assertEquals(mc.mailbox, "INBOX")
- self.assertEquals(mc.chat_action, mailconnection.DIGEST)
- self.assertEquals(mc.online_action, mailconnection.DIGEST)
- self.assertEquals(mc.away_action, mailconnection.DIGEST)
- self.assertEquals(mc.xa_action, mailconnection.DIGEST)
- self.assertEquals(mc.dnd_action, mailconnection.DIGEST)
- self.assertEquals(mc.offline_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.interval, 5)
- self.assertEquals(mc.live_email_only, False)
-
- def test_str_to_mail_connection_imap_v01_v02_retrieve(self):
- mc = str_to_mail_connection("imap#login#passwd#host#193#True#INBOX")
- self.assertEquals(mc.get_type(), "imap")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, "passwd")
- self.assertEquals(mc.store_password, True)
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 193)
- self.assertEquals(mc.mailbox, "INBOX")
- self.assertEquals(mc.chat_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.online_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.away_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.xa_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.dnd_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.offline_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.interval, 5)
- self.assertEquals(mc.live_email_only, False)
-
- def test_str_to_mail_connection_pop3_v01_v02(self):
- mc = str_to_mail_connection("pop3#login#passwd#host#110#False")
- self.assertEquals(mc.get_type(), "pop3")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, "passwd")
- self.assertEquals(mc.store_password, True)
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 110)
- self.assertEquals(mc.chat_action, mailconnection.DIGEST)
- self.assertEquals(mc.online_action, mailconnection.DIGEST)
- self.assertEquals(mc.away_action, mailconnection.DIGEST)
- self.assertEquals(mc.xa_action, mailconnection.DIGEST)
- self.assertEquals(mc.dnd_action, mailconnection.DIGEST)
- self.assertEquals(mc.offline_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.interval, 5)
- self.assertEquals(mc.live_email_only, False)
-
- def test_str_to_mail_connection_pop3_v01_v02_retrieve(self):
- mc = str_to_mail_connection("pop3#login#passwd#host#110#True")
- self.assertEquals(mc.get_type(), "pop3")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, "passwd")
- self.assertEquals(mc.store_password, True)
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 110)
- self.assertEquals(mc.chat_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.online_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.away_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.xa_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.dnd_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.offline_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.interval, 5)
- self.assertEquals(mc.live_email_only, False)
-
- def test_str_to_mail_connection_imap(self):
- mc = str_to_mail_connection("imap#login#passwd#host#193#0#0#0#1#1#2#4#True#INBOX")
- self.assertEquals(mc.get_type(), "imap")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, "passwd")
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 193)
- self.assertEquals(mc.mailbox, "INBOX")
- self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.online_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.away_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.xa_action, mailconnection.DIGEST)
- self.assertEquals(mc.dnd_action, mailconnection.DIGEST)
- self.assertEquals(mc.offline_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.interval, 4)
- self.assertEquals(mc.live_email_only, True)
-
- def test_str_to_mail_connection_no_password(self):
- mc = str_to_mail_connection("imap#login#/\\#host#193#0#0#0#1#1#2#4#False#INBOX")
- self.assertEquals(mc.get_type(), "imap")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, None)
- self.assertEquals(mc.store_password, False)
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 193)
- self.assertEquals(mc.mailbox, "INBOX")
- self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.online_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.away_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.xa_action, mailconnection.DIGEST)
- self.assertEquals(mc.dnd_action, mailconnection.DIGEST)
- self.assertEquals(mc.offline_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.interval, 4)
- self.assertEquals(mc.live_email_only, False)
-
- def test_str_to_mail_connection_imaps(self):
- mc = str_to_mail_connection("imaps#login#passwd#host#993#0#0#0#1#1#2#4#True#INBOX.SubDir")
- self.assertEquals(mc.get_type(), "imaps")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, "passwd")
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 993)
- self.assertEquals(mc.mailbox, "INBOX.SubDir")
- self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.online_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.away_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.xa_action, mailconnection.DIGEST)
- self.assertEquals(mc.dnd_action, mailconnection.DIGEST)
- self.assertEquals(mc.offline_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.interval, 4)
- self.assertEquals(mc.live_email_only, True)
-
- def test_str_to_mail_connection_pop3(self):
- mc = str_to_mail_connection("pop3#login#passwd#host#110#0#0#0#1#1#2#4#False")
- self.assertEquals(mc.get_type(), "pop3")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, "passwd")
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 110)
- self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.online_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.away_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.xa_action, mailconnection.DIGEST)
- self.assertEquals(mc.dnd_action, mailconnection.DIGEST)
- self.assertEquals(mc.offline_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.interval, 4)
- self.assertEquals(mc.live_email_only, False)
-
- def test_str_to_mail_connection_pop3s(self):
- mc = str_to_mail_connection("pop3s#login#passwd#host#995#0#0#0#1#1#2#4#True")
- self.assertEquals(mc.get_type(), "pop3s")
- self.assertEquals(mc.login, "login")
- self.assertEquals(mc.password, "passwd")
- self.assertEquals(mc.host, "host")
- self.assertEquals(mc.port, 995)
- self.assertEquals(mc.chat_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.online_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.away_action, mailconnection.DO_NOTHING)
- self.assertEquals(mc.xa_action, mailconnection.DIGEST)
- self.assertEquals(mc.dnd_action, mailconnection.DIGEST)
- self.assertEquals(mc.offline_action, mailconnection.RETRIEVE)
- self.assertEquals(mc.interval, 4)
- self.assertEquals(mc.live_email_only, True)
-
- def test_str_to_mail_connection_unknown(self):
- self.assertRaises(Exception, str_to_mail_connection, ("unknown#login#passwd#host#995#0#0#0#1#1#2#4#True"))
-
diff --git a/tests/test_storage.py b/tests/test_storage.py
deleted file mode 100644
index f3ff233..0000000
--- a/tests/test_storage.py
+++ /dev/null
@@ -1,224 +0,0 @@
-##
-## test_storage.py
-## Login : David Rousselie
-## Started on Fri May 20 10:46:58 2005 dax
-## $Id: test_component.py,v 1.1 2005/07/11 20:39:31 dax Exp $
-##
-## Copyright (C) 2005 adro8400
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-##
-
-import os
-import unittest
-import dummy_server
-from jmc.utils.storage import *
-from jmc.email.mailconnection import *
-import jmc.email.mailconnection as mailconnection
-
-class Storage_TestCase(unittest.TestCase):
- def test_init(self):
- spool_dir = "./spool/test"
- self._storage = Storage(spool_dir = spool_dir)
- self.assertTrue(os.access(spool_dir, os.F_OK))
- self.assertEquals(self._storage.spool_dir, spool_dir)
- os.removedirs(spool_dir)
-
-class DBMStorage_TestCase(unittest.TestCase):
- def setUp(self):
- spool_dir = "./spool/test"
- self._storage = DBMStorage(nb_pk_fields = 2, spool_dir = spool_dir)
- self._account1 = IMAPConnection(login = "login1",
- password = "password1",
- host = "host1",
- port = 993,
- ssl = True,
- mailbox = "INBOX.box1")
- self._account1.chat_action = mailconnection.DO_NOTHING
- self._account1.online_action = mailconnection.DO_NOTHING
- self._account1.away_action = mailconnection.DO_NOTHING
- self._account1.xa_action = mailconnection.DO_NOTHING
- self._account1.dnd_action = mailconnection.DO_NOTHING
- self._account1.offline_action = mailconnection.DO_NOTHING
- self._account1.interval = 4
- self._account2 = POP3Connection(login = "login2",
- password = "password2",
- host = "host2",
- port = 1110,
- ssl = False)
- self._account2.chat_action = mailconnection.DO_NOTHING
- self._account2.online_action = mailconnection.DO_NOTHING
- self._account2.away_action = mailconnection.DO_NOTHING
- self._account2.xa_action = mailconnection.DO_NOTHING
- self._account2.dnd_action = mailconnection.DO_NOTHING
- self._account2.offline_action = mailconnection.DO_NOTHING
- self._account2.interval = 4
- self._account2.store_password = False
- self._account2.live_email_only = True
-
- def tearDown(self):
- db_file = self._storage.file
- self._storage = None
- os.remove(db_file)
-
- def test_set_get(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- self.assertEquals(self._storage[("test@localhost", "account1")],
- self._account1)
- self.assertEquals(self._storage[("test@localhost", "account2")],
- self._account2)
-
- def test_set_sync_get(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- loaded_storage = DBMStorage(nb_pk_fields = 2, spool_dir = "./spool/test")
- self.assertEquals(loaded_storage[("test@localhost", "account1")],
- self._account1)
- self.assertEquals(loaded_storage[("test@localhost", "account2")],
- self._account2)
-
- def test_set_del_get(self):
- self._storage[("test@localhost", "account2")] = self._account2
- del self._storage[("test@localhost", "account2")]
- try:
- self._storage[("test@localhost", "account2")]
- except KeyError:
- return
- self.fail("KeyError was expected")
-
- def test_haskey(self):
- self._storage[("test@localhost", "account2")] = self._account2
- self.assertTrue(self._storage.has_key((u"test@localhost", u"account2")))
-
- def test_partial_haskey(self):
- self._storage[("test@localhost", "account2")] = self._account2
- self.assertTrue(self._storage.has_key((u"test@localhost",)))
-
- def test_get_filtered(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- result = self._storage[("test@localhost",)]
- self.assertEquals(type(result), list)
- self.assertEquals(len(result), 2)
- self.assertEquals(result[1], self._account1)
- self.assertEquals(result[0], self._account2)
-
- def test_get_filtered2(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- result = self._storage[("account1",)]
- self.assertEquals(type(result), list)
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], self._account1)
-
- def test_keys(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- result = self._storage.keys()
- self.assertEquals(type(result), list)
- self.assertEquals(len(result), 2)
- self.assertEquals(type(result[1]), tuple)
- self.assertEquals(len(result[1]), 2)
- self.assertEquals(result[1][0], "test@localhost")
- self.assertEquals(result[1][1], "account1")
- self.assertEquals(type(result[0]), tuple)
- self.assertEquals(len(result[0]), 2)
- self.assertEquals(result[0][0], "test@localhost")
- self.assertEquals(result[0][1], "account2")
-
- def test_keys_filtered(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- result = self._storage.keys(())
- self.assertEquals(type(result), list)
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], "test@localhost")
-
- def test_keys_filtered2(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- result = self._storage.keys(("test@localhost",))
- self.assertEquals(type(result), list)
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], "account2")
- self.assertEquals(result[1], "account1")
-
- def test_del_sync_get(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- del self._storage[("test@localhost", "account2")]
- loaded_storage = DBMStorage(nb_pk_fields = 2, spool_dir = "./spool/test")
- self.assertEquals(len(loaded_storage.keys()),
- 1)
- self.assertEquals(loaded_storage[("test@localhost", "account1")],
- self._account1)
-
-class SQLiteStorage_TestCase(DBMStorage_TestCase):
- def setUp(self):
- spool_dir = "./spool/test"
- self._storage = SQLiteStorage(nb_pk_fields = 2, spool_dir = spool_dir)
- self._account1 = IMAPConnection(login = "login1",
- password = "password1",
- host = "host1",
- port = 993,
- ssl = True,
- mailbox = "INBOX.box1")
- self._account1.chat_action = mailconnection.DIGEST
- self._account1.online_action = mailconnection.DIGEST
- self._account1.away_action = mailconnection.DO_NOTHING
- self._account1.xa_action = mailconnection.DO_NOTHING
- self._account1.dnd_action = mailconnection.DO_NOTHING
- self._account1.offline_action = mailconnection.DO_NOTHING
- self._account1.interval = 4
- self._account2 = POP3Connection(login = "login2",
- password = "password2",
- host = "host2",
- port = 1993,
- ssl = False)
- self._account2.chat_action = mailconnection.DO_NOTHING
- self._account2.online_action = mailconnection.DO_NOTHING
- self._account2.away_action = mailconnection.DO_NOTHING
- self._account2.xa_action = mailconnection.DO_NOTHING
- self._account2.dnd_action = mailconnection.DO_NOTHING
- self._account2.offline_action = mailconnection.DO_NOTHING
- self._account2.interval = 4
- self._account2.store_password = False
- self._account2.live_email_only = True
-
-# def tearDown(self):
-# os.remove(self._storage.file)
-# self._storage = None
-
-
- def test_set_sync_get(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- self._account2.password = None
- loaded_storage = SQLiteStorage(nb_pk_fields = 2, spool_dir = "./spool/test")
- self.assertEquals(loaded_storage[("test@localhost", "account1")],
- self._account1)
- self.assertEquals(loaded_storage[("test@localhost", "account2")],
- self._account2)
-
- def test_del_sync_get(self):
- self._storage[("test@localhost", "account1")] = self._account1
- self._storage[("test@localhost", "account2")] = self._account2
- del self._storage[("test@localhost", "account2")]
- loaded_storage = SQLiteStorage(nb_pk_fields = 2, spool_dir = "./spool/test")
- self.assertEquals(len(loaded_storage.keys()),
- 1)
- self.assertEquals(loaded_storage[("test@localhost", "account1")],
- self._account1)
-
diff --git a/tests/test_x.py b/tests/test_x.py
deleted file mode 100644
index 4e45f9e..0000000
--- a/tests/test_x.py
+++ /dev/null
@@ -1,72 +0,0 @@
-##
-## test_x.py
-## Login : David Rousselie
-## Started on Fri May 20 10:46:58 2005
-## $Id: test_x.py,v 1.1 2005/07/11 20:39:31 dax Exp $
-##
-## Copyright (C) 2005
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-##
-
-import unittest
-from jmc.jabber.x import *
-
-class X_TestCase(unittest.TestCase):
- def setUp(self):
- self.mail_component = MailComponent()
-
- def test_get_form(self):
- self.reg_form.add_field(type = "text-single", \
- label = "Connection name", \
- var = "name")
-
- self.reg_form.add_field(type = "text-single", \
- label = "Login", \
- var = "login")
-
- self.reg_form.add_field(type = "text-private", \
- label = "password", \
- var = "password")
-
- self.reg_form.add_field(type = "text-single", \
- label = "Host", \
- var = "host")
-
- self.reg_form.add_field(type = "text-single", \
- label = "Port", \
- var = "port")
-
- field = self.reg_form.add_field(type = "list-single", \
- label = "Mailbox type", \
- var = "type")
- field.add_option(label = "POP3", \
- value = "pop3")
- field.add_option(label = "POP3S", \
- value = "pop3s")
- field.add_option(label = "IMAP", \
- value = "imap")
- field.add_option(label = "IMAPS", \
- value = "imaps")
-
- self.reg_form.add_field(type = "text-single", \
- label = "Mailbox (IMAP)", \
- var = "mailbox", \
- value = "INBOX")
-
- self.reg_form.add_field(type = "boolean", \
- label = "Retrieve mail", \
- var = "retrieve")
- pass
-
diff --git a/tests/utils.py b/tests/utils.py
deleted file mode 100644
index 0c5a619..0000000
--- a/tests/utils.py
+++ /dev/null
@@ -1,95 +0,0 @@
-##
-## utils.py
-## Login : David Rousselie
-## Started on Mon Oct 24 21:44:43 2005 dax
-## $Id$
-##
-## Copyright (C) 2005 dax
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-##
-
-import xml.dom.minidom
-import re
-
-def xmldiff(node1, node2):
- if node1.nodeType == node1.TEXT_NODE:
- if not node2.nodeType == node2.TEXT_NODE \
- or re.compile(node2.data + "$").match(node1.data) is None:
- raise Exception("data in text node " + node1.data + " does not match " + node2.data)
- elif node1.nodeType == node1.DOCUMENT_NODE:
- if not node2.nodeType == node2.DOCUMENT_NODE:
- raise Exception("node1 is Document but not node2 (" + node2.nodeType + ")")
- elif node1.tagName != node2.tagName:
- raise Exception("Different tag name : " + node1.tagName + " != " + node2.tagName)
- else:
- for attr in node1._get_attributes().keys():
- if not node2.hasAttribute(attr) \
- or node1.getAttribute(attr) != node2.getAttribute(attr):
- raise Exception("(" + node1.tagName + ") Different attributes : " + node1.getAttribute(attr) + " != " + node2.getAttribute(attr))
- if len(node1.childNodes) != len(node2.childNodes):
- raise Exception("(" + node1.tagName + ") Different children number : " + str(len(node1.childNodes)) + " != " + str(len(node2.childNodes)))
- for i in range(len(node1.childNodes)):
- xmldiff(node1.childNodes[i], node2.childNodes[i])
-
-# def xmldiff(events1, events2):
-# for (event1, node1) in events1:
-# (event2, node2) = events2.next()
-# print event1 + " " + str(node1)
-# if not (event1 == event2) or not xml_diff_nodes(node1, node2):
-# return False
-# return True
-
-if __name__ == "__main__":
- document1 = """\
-
- Demo slideshow
- Slide title
- This is a demo
- Of a program for processing slides
-
-
- Another demo slide
- It is important
- To have more than
- one slide
-
-
- """
-
- document2 = """\
-
- Demo slideshow
- Slide title
- This is a demo
- Of a program for processing slides
-
-
- Another demo slide
- It is important
- To have more than
- one slide
-
-
- """
-
- dom1 = xml.dom.minidom.parseString(document1)
- dom2 = xml.dom.minidom.parseString(document2)
-
- try:
- xmldiff(dom1, dom2)
- except Exception, msg:
- print msg
-
-