darcs-hash:20061127215605-86b55-405c73e023a6a423127628ab4ca22166008dac7f.gz
This commit is contained in:
David Rousselie
2006-11-27 22:56:05 +01:00
parent 5350af9b80
commit 2e5337483d

View File

@@ -6,6 +6,8 @@
# COVERAGE.PY -- COVERAGE TESTING # COVERAGE.PY -- COVERAGE TESTING
# #
# Gareth Rees, Ravenbrook Limited, 2001-12-04 # Gareth Rees, Ravenbrook Limited, 2001-12-04
# Ned Batchelder, 2004-12-12
# http://nedbatchelder.com/code/modules/coverage.html
# #
# #
# 1. INTRODUCTION # 1. INTRODUCTION
@@ -20,34 +22,49 @@
# interface and limitations. See [GDR 2001-12-04b] for requirements and # interface and limitations. See [GDR 2001-12-04b] for requirements and
# design. # design.
"""Usage: r"""Usage:
coverage.py -x MODULE.py [ARG1 ARG2 ...] coverage.py -x [-p] MODULE.py [ARG1 ARG2 ...]
Execute module, passing the given command-line arguments, collecting Execute module, passing the given command-line arguments, collecting
coverage data. coverage data. With the -p option, write to a temporary file containing
the machine name and process ID.
coverage.py -e coverage.py -e
Erase collected coverage data. Erase collected coverage data.
coverage.py -r [-m] FILE1 FILE2 ... coverage.py -c
Collect data from multiple coverage files (as created by -p option above)
and store it into a single file representing the union of the coverage.
coverage.py -r [-m] [-o dir1,dir2,...] FILE1 FILE2 ...
Report on the statement coverage for the given files. With the -m Report on the statement coverage for the given files. With the -m
option, show line numbers of the statements that weren't executed. option, show line numbers of the statements that weren't executed.
coverage.py -a [-d dir] FILE1 FILE2 ... coverage.py -a [-d dir] [-o dir1,dir2,...] FILE1 FILE2 ...
Make annotated copies of the given files, marking statements that Make annotated copies of the given files, marking statements that
are executed with > and statements that are missed with !. With are executed with > and statements that are missed with !. With
the -d option, make the copies in that directory. Without the -d the -d option, make the copies in that directory. Without the -d
option, make each copy in the same directory as the original. option, make each copy in the same directory as the original.
-o dir,dir2,...
Omit reporting or annotating files when their filename path starts with
a directory listed in the omit list.
e.g. python coverage.py -i -r -o c:\python23,lib\enthought\traits
Coverage data is saved in the file .coverage by default. Set the Coverage data is saved in the file .coverage by default. Set the
COVERAGE_FILE environment variable to save it somewhere else.""" COVERAGE_FILE environment variable to save it somewhere else."""
__version__ = "2.6.20060823" # see detailed history at the end of this file.
import compiler
import compiler.visitor
import os import os
import re import re
import string import string
import sys import sys
import threading
import types import types
from socket import gethostname
# 2. IMPLEMENTATION # 2. IMPLEMENTATION
# #
@@ -69,37 +86,175 @@ import types
# In the bottleneck of this application it's appropriate to abbreviate # In the bottleneck of this application it's appropriate to abbreviate
# names to increase speed. # names to increase speed.
# A dictionary with an entry for (Python source file name, line number class StatementFindingAstVisitor(compiler.visitor.ASTVisitor):
# in that file) if that line has been executed. def __init__(self, statements, excluded, suite_spots):
c = {} compiler.visitor.ASTVisitor.__init__(self)
self.statements = statements
self.excluded = excluded
self.suite_spots = suite_spots
self.excluding_suite = 0
# t(f, x, y). This method is passed to sys.settrace as a trace def doRecursive(self, node):
# function. See [van Rossum 2001-07-20b, 9.2] for an explanation of self.recordNodeLine(node)
# sys.settrace and the arguments and return value of the trace function. for n in node.getChildNodes():
# See [van Rossum 2001-07-20a, 3.2] for a description of frame and code self.dispatch(n)
# objects.
def t(f, x, y): visitStmt = visitModule = doRecursive
c[(f.f_code.co_filename, f.f_lineno)] = 1
return t def doCode(self, node):
if hasattr(node, 'decorators') and node.decorators:
self.dispatch(node.decorators)
self.recordAndDispatch(node.code)
else:
self.doSuite(node, node.code)
visitFunction = visitClass = doCode
def getFirstLine(self, node):
# Find the first line in the tree node.
lineno = node.lineno
for n in node.getChildNodes():
f = self.getFirstLine(n)
if lineno and f:
lineno = min(lineno, f)
else:
lineno = lineno or f
return lineno
def getLastLine(self, node):
# Find the first line in the tree node.
lineno = node.lineno
for n in node.getChildNodes():
lineno = max(lineno, self.getLastLine(n))
return lineno
def doStatement(self, node):
self.recordLine(self.getFirstLine(node))
visitAssert = visitAssign = visitAssTuple = visitDiscard = visitPrint = \
visitPrintnl = visitRaise = visitSubscript = visitDecorators = \
doStatement
def recordNodeLine(self, node):
return self.recordLine(node.lineno)
def recordLine(self, lineno):
# Returns a bool, whether the line is included or excluded.
if lineno:
# Multi-line tests introducing suites have to get charged to their
# keyword.
if lineno in self.suite_spots:
lineno = self.suite_spots[lineno][0]
# If we're inside an exluded suite, record that this line was
# excluded.
if self.excluding_suite:
self.excluded[lineno] = 1
return 0
# If this line is excluded, or suite_spots maps this line to
# another line that is exlcuded, then we're excluded.
elif self.excluded.has_key(lineno) or \
self.suite_spots.has_key(lineno) and \
self.excluded.has_key(self.suite_spots[lineno][1]):
return 0
# Otherwise, this is an executable line.
else:
self.statements[lineno] = 1
return 1
return 0
default = recordNodeLine
def recordAndDispatch(self, node):
self.recordNodeLine(node)
self.dispatch(node)
def doSuite(self, intro, body, exclude=0):
exsuite = self.excluding_suite
if exclude or (intro and not self.recordNodeLine(intro)):
self.excluding_suite = 1
self.recordAndDispatch(body)
self.excluding_suite = exsuite
def doPlainWordSuite(self, prevsuite, suite):
# Finding the exclude lines for else's is tricky, because they aren't
# present in the compiler parse tree. Look at the previous suite,
# and find its last line. If any line between there and the else's
# first line are excluded, then we exclude the else.
lastprev = self.getLastLine(prevsuite)
firstelse = self.getFirstLine(suite)
for l in range(lastprev+1, firstelse):
if self.suite_spots.has_key(l):
self.doSuite(None, suite, exclude=self.excluded.has_key(l))
break
else:
self.doSuite(None, suite)
def doElse(self, prevsuite, node):
if node.else_:
self.doPlainWordSuite(prevsuite, node.else_)
def visitFor(self, node):
self.doSuite(node, node.body)
self.doElse(node.body, node)
def visitIf(self, node):
# The first test has to be handled separately from the rest.
# The first test is credited to the line with the "if", but the others
# are credited to the line with the test for the elif.
self.doSuite(node, node.tests[0][1])
for t, n in node.tests[1:]:
self.doSuite(t, n)
self.doElse(node.tests[-1][1], node)
def visitWhile(self, node):
self.doSuite(node, node.body)
self.doElse(node.body, node)
def visitTryExcept(self, node):
self.doSuite(node, node.body)
for i in range(len(node.handlers)):
a, b, h = node.handlers[i]
if not a:
# It's a plain "except:". Find the previous suite.
if i > 0:
prev = node.handlers[i-1][2]
else:
prev = node.body
self.doPlainWordSuite(prev, h)
else:
self.doSuite(a, h)
self.doElse(node.handlers[-1][2], node)
def visitTryFinally(self, node):
self.doSuite(node, node.body)
self.doPlainWordSuite(node.body, node.final)
def visitGlobal(self, node):
# "global" statements don't execute like others (they don't call the
# trace function), so don't record their line numbers.
pass
the_coverage = None the_coverage = None
class coverage: class CoverageException(Exception): pass
error = "coverage error"
class coverage:
# Name of the cache file (unless environment variable is set). # Name of the cache file (unless environment variable is set).
cache_default = ".coverage" cache_default = ".coverage"
# Environment variable naming the cache file. # Environment variable naming the cache file.
cache_env = "COVERAGE_FILE" cache_env = "COVERAGE_FILE"
# A dictionary with an entry for (Python source file name, line number
# in that file) if that line has been executed.
c = {}
# A map from canonical Python source file name to a dictionary in # A map from canonical Python source file name to a dictionary in
# which there's an entry for each line number that has been # which there's an entry for each line number that has been
# executed. # executed.
cexecuted = {} cexecuted = {}
# Cache of results of calling the analysis() method, so that you can # Cache of results of calling the analysis2() method, so that you can
# specify both -r and -a without doing double work. # specify both -r and -a without doing double work.
analysis_cache = {} analysis_cache = {}
@@ -110,10 +265,27 @@ class coverage:
def __init__(self): def __init__(self):
global the_coverage global the_coverage
if the_coverage: if the_coverage:
raise self.error, "Only one coverage object allowed." raise CoverageException, "Only one coverage object allowed."
self.cache = os.environ.get(self.cache_env, self.cache_default) self.usecache = 1
self.restore() self.cache = None
self.analysis_cache = {} self.exclude_re = ''
self.nesting = 0
self.cstack = []
self.xstack = []
self.relative_dir = os.path.normcase(os.path.abspath(os.curdir)+os.path.sep)
# t(f, x, y). This method is passed to sys.settrace as a trace function.
# See [van Rossum 2001-07-20b, 9.2] for an explanation of sys.settrace and
# the arguments and return value of the trace function.
# See [van Rossum 2001-07-20a, 3.2] for a description of frame and code
# objects.
def t(self, f, w, a): #pragma: no cover
if w == 'line':
self.c[(f.f_code.co_filename, f.f_lineno)] = 1
for c in self.cstack:
c[(f.f_code.co_filename, f.f_lineno)] = 1
return self.t
def help(self, error=None): def help(self, error=None):
if error: if error:
@@ -122,23 +294,26 @@ class coverage:
print __doc__ print __doc__
sys.exit(1) sys.exit(1)
def command_line(self): def command_line(self, argv, help=None):
import getopt import getopt
help = help or self.help
settings = {} settings = {}
optmap = { optmap = {
'-a': 'annotate', '-a': 'annotate',
'-c': 'collect',
'-d:': 'directory=', '-d:': 'directory=',
'-e': 'erase', '-e': 'erase',
'-h': 'help', '-h': 'help',
'-i': 'ignore-errors', '-i': 'ignore-errors',
'-m': 'show-missing', '-m': 'show-missing',
'-p': 'parallel-mode',
'-r': 'report', '-r': 'report',
'-x': 'execute', '-x': 'execute',
'-o:': 'omit=',
} }
short_opts = string.join(map(lambda o: o[1:], optmap.keys()), '') short_opts = string.join(map(lambda o: o[1:], optmap.keys()), '')
long_opts = optmap.values() long_opts = optmap.values()
options, args = getopt.getopt(sys.argv[1:], short_opts, options, args = getopt.getopt(argv, short_opts, long_opts)
long_opts)
for o, a in options: for o, a in options:
if optmap.has_key(o): if optmap.has_key(o):
settings[optmap[o]] = 1 settings[optmap[o]] = 1
@@ -147,86 +322,167 @@ class coverage:
elif o[2:] in long_opts: elif o[2:] in long_opts:
settings[o[2:]] = 1 settings[o[2:]] = 1
elif o[2:] + '=' in long_opts: elif o[2:] + '=' in long_opts:
settings[o[2:]] = a settings[o[2:]+'='] = a
else: else: #pragma: no cover
self.help("Unknown option: '%s'." % o) pass # Can't get here, because getopt won't return anything unknown.
if settings.get('help'): if settings.get('help'):
self.help() help()
for i in ['erase', 'execute']: for i in ['erase', 'execute']:
for j in ['annotate', 'report']: for j in ['annotate', 'report', 'collect']:
if settings.get(i) and settings.get(j): if settings.get(i) and settings.get(j):
self.help("You can't specify the '%s' and '%s' " help("You can't specify the '%s' and '%s' "
"options at the same time." % (i, j)) "options at the same time." % (i, j))
args_needed = (settings.get('execute') args_needed = (settings.get('execute')
or settings.get('annotate') or settings.get('annotate')
or settings.get('report')) or settings.get('report'))
action = settings.get('erase') or args_needed action = (settings.get('erase')
or settings.get('collect')
or args_needed)
if not action: if not action:
self.help("You must specify at least one of -e, -x, -r, " help("You must specify at least one of -e, -x, -c, -r, or -a.")
"or -a.")
if not args_needed and args: if not args_needed and args:
self.help("Unexpected arguments %s." % args) help("Unexpected arguments: %s" % " ".join(args))
self.get_ready(settings.get('parallel-mode'))
self.exclude('#pragma[: ]+[nN][oO] [cC][oO][vV][eE][rR]')
if settings.get('erase'): if settings.get('erase'):
self.erase() self.erase()
if settings.get('execute'): if settings.get('execute'):
if not args: if not args:
self.help("Nothing to do.") help("Nothing to do.")
sys.argv = args sys.argv = args
self.start() self.start()
import __main__ import __main__
sys.path[0] = os.path.dirname(sys.argv[0]) sys.path[0] = os.path.dirname(sys.argv[0])
execfile(sys.argv[0], __main__.__dict__) execfile(sys.argv[0], __main__.__dict__)
if settings.get('collect'):
self.collect()
if not args: if not args:
args = self.cexecuted.keys() args = self.cexecuted.keys()
ignore_errors = settings.get('ignore-errors') ignore_errors = settings.get('ignore-errors')
show_missing = settings.get('show-missing') show_missing = settings.get('show-missing')
directory = settings.get('directory=') directory = settings.get('directory=')
if settings.get('report'):
self.report(args, show_missing, ignore_errors)
if settings.get('annotate'):
self.annotate(args, directory, ignore_errors)
def start(self): omit = settings.get('omit=')
sys.settrace(t) if omit is not None:
omit = omit.split(',')
else:
omit = []
if settings.get('report'):
self.report(args, show_missing, ignore_errors, omit_prefixes=omit)
if settings.get('annotate'):
self.annotate(args, directory, ignore_errors, omit_prefixes=omit)
def use_cache(self, usecache, cache_file=None):
self.usecache = usecache
if cache_file and not self.cache:
self.cache_default = cache_file
def get_ready(self, parallel_mode=False):
if self.usecache and not self.cache:
self.cache = os.environ.get(self.cache_env, self.cache_default)
if parallel_mode:
self.cache += "." + gethostname() + "." + str(os.getpid())
self.restore()
self.analysis_cache = {}
def start(self, parallel_mode=False):
self.get_ready(parallel_mode)
if self.nesting == 0: #pragma: no cover
sys.settrace(self.t)
if hasattr(threading, 'settrace'):
threading.settrace(self.t)
self.nesting += 1
def stop(self): def stop(self):
self.nesting -= 1
if self.nesting == 0: #pragma: no cover
sys.settrace(None) sys.settrace(None)
if hasattr(threading, 'settrace'):
threading.settrace(None)
def erase(self): def erase(self):
global c self.c = {}
c = {}
self.analysis_cache = {} self.analysis_cache = {}
self.cexecuted = {} self.cexecuted = {}
if os.path.exists(self.cache): if self.cache and os.path.exists(self.cache):
os.remove(self.cache) os.remove(self.cache)
self.exclude_re = ""
def exclude(self, re):
if self.exclude_re:
self.exclude_re += "|"
self.exclude_re += "(" + re + ")"
def begin_recursive(self):
self.cstack.append(self.c)
self.xstack.append(self.exclude_re)
def end_recursive(self):
self.c = self.cstack.pop()
self.exclude_re = self.xstack.pop()
# save(). Save coverage data to the coverage cache. # save(). Save coverage data to the coverage cache.
def save(self): def save(self):
if self.usecache and self.cache:
self.canonicalize_filenames() self.canonicalize_filenames()
cache = open(self.cache, 'wb') cache = open(self.cache, 'wb')
import marshal import marshal
marshal.dump(self.cexecuted, cache) marshal.dump(self.cexecuted, cache)
cache.close() cache.close()
# restore(). Restore coverage data from the coverage cache (if it # restore(). Restore coverage data from the coverage cache (if it exists).
# exists).
def restore(self): def restore(self):
global c self.c = {}
c = {}
self.cexecuted = {} self.cexecuted = {}
if not os.path.exists(self.cache): assert self.usecache
return if os.path.exists(self.cache):
self.cexecuted = self.restore_file(self.cache)
def restore_file(self, file_name):
try: try:
cache = open(self.cache, 'rb') cache = open(file_name, 'rb')
import marshal import marshal
cexecuted = marshal.load(cache) cexecuted = marshal.load(cache)
cache.close() cache.close()
if isinstance(cexecuted, types.DictType): if isinstance(cexecuted, types.DictType):
self.cexecuted = cexecuted return cexecuted
else:
return {}
except: except:
pass return {}
# collect(). Collect data in multiple files produced by parallel mode
def collect(self):
cache_dir, local = os.path.split(self.cache)
for file in os.listdir(cache_dir):
if not file.startswith(local):
continue
full_path = os.path.join(cache_dir, file)
cexecuted = self.restore_file(full_path)
self.merge_data(cexecuted)
def merge_data(self, new_data):
for file_name, file_data in new_data.items():
if self.cexecuted.has_key(file_name):
self.merge_file_data(self.cexecuted[file_name], file_data)
else:
self.cexecuted[file_name] = file_data
def merge_file_data(self, cache_data, new_data):
for line_number in new_data.keys():
if not cache_data.has_key(line_number):
cache_data[line_number] = new_data[line_number]
# canonical_filename(filename). Return a canonical filename for the # canonical_filename(filename). Return a canonical filename for the
# file (that is, an absolute path with no redundant components and # file (that is, an absolute path with no redundant components and
@@ -247,25 +503,23 @@ class coverage:
self.canonical_filename_cache[filename] = cf self.canonical_filename_cache[filename] = cf
return self.canonical_filename_cache[filename] return self.canonical_filename_cache[filename]
# canonicalize_filenames(). Copy results from "executed" to # canonicalize_filenames(). Copy results from "c" to "cexecuted",
# "cexecuted", canonicalizing filenames on the way. Clear the # canonicalizing filenames on the way. Clear the "c" map.
# "executed" map.
def canonicalize_filenames(self): def canonicalize_filenames(self):
global c for filename, lineno in self.c.keys():
for filename, lineno in c.keys():
f = self.canonical_filename(filename) f = self.canonical_filename(filename)
if not self.cexecuted.has_key(f): if not self.cexecuted.has_key(f):
self.cexecuted[f] = {} self.cexecuted[f] = {}
self.cexecuted[f][lineno] = 1 self.cexecuted[f][lineno] = 1
c = {} self.c = {}
# morf_filename(morf). Return the filename for a module or file. # morf_filename(morf). Return the filename for a module or file.
def morf_filename(self, morf): def morf_filename(self, morf):
if isinstance(morf, types.ModuleType): if isinstance(morf, types.ModuleType):
if not hasattr(morf, '__file__'): if not hasattr(morf, '__file__'):
raise self.error, "Module has no __file__ attribute." raise CoverageException, "Module has no __file__ attribute."
file = morf.__file__ file = morf.__file__
else: else:
file = morf file = morf
@@ -273,9 +527,9 @@ class coverage:
# analyze_morf(morf). Analyze the module or filename passed as # analyze_morf(morf). Analyze the module or filename passed as
# the argument. If the source code can't be found, raise an error. # the argument. If the source code can't be found, raise an error.
# Otherwise, return a pair of (1) the canonical filename of the # Otherwise, return a tuple of (1) the canonical filename of the
# source code for the module, and (2) a list of lines of statements # source code for the module, (2) a list of lines of statements
# in the source code. # in the source code, and (3) a list of lines of excluded statements.
def analyze_morf(self, morf): def analyze_morf(self, morf):
if self.analysis_cache.has_key(morf): if self.analysis_cache.has_key(morf):
@@ -284,57 +538,82 @@ class coverage:
ext = os.path.splitext(filename)[1] ext = os.path.splitext(filename)[1]
if ext == '.pyc': if ext == '.pyc':
if not os.path.exists(filename[0:-1]): if not os.path.exists(filename[0:-1]):
raise self.error, ("No source for compiled code '%s'." raise CoverageException, ("No source for compiled code '%s'."
% filename) % filename)
filename = filename[0:-1] filename = filename[0:-1]
elif ext != '.py': elif ext != '.py':
raise self.error, "File '%s' not Python source." % filename raise CoverageException, "File '%s' not Python source." % filename
source = open(filename, 'r') source = open(filename, 'r')
import parser lines, excluded_lines = self.find_executable_statements(
tree = parser.suite(source.read()).totuple(1) source.read(), exclude=self.exclude_re
)
source.close() source.close()
statements = {} result = filename, lines, excluded_lines
self.find_statements(tree, statements)
lines = statements.keys()
lines.sort()
result = filename, lines
self.analysis_cache[morf] = result self.analysis_cache[morf] = result
return result return result
# find_statements(tree, dict). Find each statement in the parse def get_suite_spots(self, tree, spots):
# tree and record the line on which the statement starts in the
# dictionary (by assigning it to 1).
#
# It works by walking the whole tree depth-first. Every time it
# comes across a statement (symbol.stmt -- this includes compound
# statements like 'if' and 'while') it calls find_statement, which
# descends the tree below the statement to find the first terminal
# token in that statement and record the lines on which that token
# was found.
#
# This algorithm may find some lines several times (because of the
# grammar production statement -> compound statement -> statement),
# but that doesn't matter because we record lines as the keys of the
# dictionary.
#
# See also [GDR 2001-12-04b, 3.2].
def find_statements(self, tree, dict):
import symbol, token import symbol, token
if token.ISNONTERMINAL(tree[0]): for i in range(1, len(tree)):
for t in tree[1:]: if type(tree[i]) == type(()):
self.find_statements(t, dict) if tree[i][0] == symbol.suite:
if tree[0] == symbol.stmt: # Found a suite, look back for the colon and keyword.
self.find_statement(tree[1], dict) lineno_colon = lineno_word = None
elif (tree[0] == token.NAME for j in range(i-1, 0, -1):
and tree[1] in ['elif', 'except', 'finally']): if tree[j][0] == token.COLON:
dict[tree[2]] = 1 lineno_colon = tree[j][2]
elif tree[j][0] == token.NAME:
if tree[j][1] == 'elif':
# Find the line number of the first non-terminal
# after the keyword.
t = tree[j+1]
while t and token.ISNONTERMINAL(t[0]):
t = t[1]
if t:
lineno_word = t[2]
else:
lineno_word = tree[j][2]
break
elif tree[j][0] == symbol.except_clause:
# "except" clauses look like:
# ('except_clause', ('NAME', 'except', lineno), ...)
if tree[j][1][0] == token.NAME:
lineno_word = tree[j][1][2]
break
if lineno_colon and lineno_word:
# Found colon and keyword, mark all the lines
# between the two with the two line numbers.
for l in range(lineno_word, lineno_colon+1):
spots[l] = (lineno_word, lineno_colon)
self.get_suite_spots(tree[i], spots)
def find_statement(self, tree, dict): def find_executable_statements(self, text, exclude=None):
import token # Find lines which match an exclusion pattern.
while token.ISNONTERMINAL(tree[0]): excluded = {}
tree = tree[1] suite_spots = {}
dict[tree[2]] = 1 if exclude:
reExclude = re.compile(exclude)
lines = text.split('\n')
for i in range(len(lines)):
if reExclude.search(lines[i]):
excluded[i+1] = 1
import parser
tree = parser.suite(text+'\n\n').totuple(1)
self.get_suite_spots(tree, suite_spots)
# Use the compiler module to parse the text and find the executable
# statements. We add newlines to be impervious to final partial lines.
statements = {}
ast = compiler.parse(text+'\n\n')
visitor = StatementFindingAstVisitor(statements, excluded, suite_spots)
compiler.walk(ast, visitor, walker=visitor)
lines = statements.keys()
lines.sort()
excluded_lines = excluded.keys()
excluded_lines.sort()
return lines, excluded_lines
# format_lines(statements, lines). Format a list of line numbers # format_lines(statements, lines). Format a list of line numbers
# for printing by coalescing groups of lines as long as the lines # for printing by coalescing groups of lines as long as the lines
@@ -367,11 +646,15 @@ class coverage:
return "%d" % start return "%d" % start
else: else:
return "%d-%d" % (start, end) return "%d-%d" % (start, end)
import string
return string.join(map(stringify, pairs), ", ") return string.join(map(stringify, pairs), ", ")
# Backward compatibility with version 1.
def analysis(self, morf): def analysis(self, morf):
filename, statements = self.analyze_morf(morf) f, s, _, m, mf = self.analysis2(morf)
return f, s, m, mf
def analysis2(self, morf):
filename, statements, excluded = self.analyze_morf(morf)
self.canonicalize_filenames() self.canonicalize_filenames()
if not self.cexecuted.has_key(filename): if not self.cexecuted.has_key(filename):
self.cexecuted[filename] = {} self.cexecuted[filename] = {}
@@ -379,18 +662,45 @@ class coverage:
for line in statements: for line in statements:
if not self.cexecuted[filename].has_key(line): if not self.cexecuted[filename].has_key(line):
missing.append(line) missing.append(line)
return (filename, statements, missing, return (filename, statements, excluded, missing,
self.format_lines(statements, missing)) self.format_lines(statements, missing))
def relative_filename(self, filename):
""" Convert filename to relative filename from self.relative_dir.
"""
return filename.replace(self.relative_dir, "")
def morf_name(self, morf): def morf_name(self, morf):
""" Return the name of morf as used in report.
"""
if isinstance(morf, types.ModuleType): if isinstance(morf, types.ModuleType):
return morf.__name__ return morf.__name__
else: else:
return os.path.splitext(os.path.basename(morf))[0] return self.relative_filename(os.path.splitext(morf)[0])
def report(self, morfs, show_missing=1, ignore_errors=0): def filter_by_prefix(self, morfs, omit_prefixes):
""" Return list of morfs where the morf name does not begin
with any one of the omit_prefixes.
"""
filtered_morfs = []
for morf in morfs:
for prefix in omit_prefixes:
if self.morf_name(morf).startswith(prefix):
break
else:
filtered_morfs.append(morf)
return filtered_morfs
def morf_name_compare(self, x, y):
return cmp(self.morf_name(x), self.morf_name(y))
def report(self, morfs, show_missing=1, ignore_errors=0, file=None, omit_prefixes=[]):
if not isinstance(morfs, types.ListType): if not isinstance(morfs, types.ListType):
morfs = [morfs] morfs = [morfs]
morfs = self.filter_by_prefix(morfs, omit_prefixes)
morfs.sort(self.morf_name_compare)
max_name = max([5,] + map(len, map(self.morf_name, morfs))) max_name = max([5,] + map(len, map(self.morf_name, morfs)))
fmt_name = "%%- %ds " % max_name fmt_name = "%%- %ds " % max_name
fmt_err = fmt_name + "%s: %s" fmt_err = fmt_name + "%s: %s"
@@ -399,14 +709,16 @@ class coverage:
if show_missing: if show_missing:
header = header + " Missing" header = header + " Missing"
fmt_coverage = fmt_coverage + " %s" fmt_coverage = fmt_coverage + " %s"
print header if not file:
print "-" * len(header) file = sys.stdout
print >>file, header
print >>file, "-" * len(header)
total_statements = 0 total_statements = 0
total_executed = 0 total_executed = 0
for morf in morfs: for morf in morfs:
name = self.morf_name(morf) name = self.morf_name(morf)
try: try:
_, statements, missing, readable = self.analysis(morf) _, statements, _, missing, readable = self.analysis2(morf)
n = len(statements) n = len(statements)
m = n - len(missing) m = n - len(missing)
if n > 0: if n > 0:
@@ -416,17 +728,17 @@ class coverage:
args = (name, n, m, pc) args = (name, n, m, pc)
if show_missing: if show_missing:
args = args + (readable,) args = args + (readable,)
print fmt_coverage % args print >>file, fmt_coverage % args
total_statements = total_statements + n total_statements = total_statements + n
total_executed = total_executed + m total_executed = total_executed + m
except KeyboardInterrupt: except KeyboardInterrupt: #pragma: no cover
raise raise
except: except:
if not ignore_errors: if not ignore_errors:
type, msg = sys.exc_info()[0:2] type, msg = sys.exc_info()[0:2]
print fmt_err % (name, type, msg) print >>file, fmt_err % (name, type, msg)
if len(morfs) > 1: if len(morfs) > 1:
print "-" * len(header) print >>file, "-" * len(header)
if total_statements > 0: if total_statements > 0:
pc = 100.0 * total_executed / total_statements pc = 100.0 * total_executed / total_statements
else: else:
@@ -434,17 +746,26 @@ class coverage:
args = ("TOTAL", total_statements, total_executed, pc) args = ("TOTAL", total_statements, total_executed, pc)
if show_missing: if show_missing:
args = args + ("",) args = args + ("",)
print fmt_coverage % args print >>file, fmt_coverage % args
# annotate(morfs, ignore_errors). # annotate(morfs, ignore_errors).
blank_re = re.compile("\\s*(#|$)") blank_re = re.compile(r"\s*(#|$)")
else_re = re.compile("\\s*else\\s*:\\s*(#|$)") else_re = re.compile(r"\s*else\s*:\s*(#|$)")
def annotate(self, morfs, directory=None, ignore_errors=0): def annotate(self, morfs, directory=None, ignore_errors=0, omit_prefixes=[]):
morfs = self.filter_by_prefix(morfs, omit_prefixes)
for morf in morfs: for morf in morfs:
try: try:
filename, statements, missing, _ = self.analysis(morf) filename, statements, excluded, missing, _ = self.analysis2(morf)
self.annotate_file(filename, statements, excluded, missing, directory)
except KeyboardInterrupt:
raise
except:
if not ignore_errors:
raise
def annotate_file(self, filename, statements, excluded, missing, directory=None):
source = open(filename, 'r') source = open(filename, 'r')
if directory: if directory:
dest_file = os.path.join(directory, dest_file = os.path.join(directory,
@@ -471,8 +792,8 @@ class coverage:
if self.blank_re.match(line): if self.blank_re.match(line):
dest.write(' ') dest.write(' ')
elif self.else_re.match(line): elif self.else_re.match(line):
# Special logic for lines containing only # Special logic for lines containing only 'else:'.
# 'else:'. See [GDR 2001-12-04b, 3.2]. # See [GDR 2001-12-04b, 3.2].
if i >= len(statements) and j >= len(missing): if i >= len(statements) and j >= len(missing):
dest.write('! ') dest.write('! ')
elif i >= len(statements) or j >= len(missing): elif i >= len(statements) or j >= len(missing):
@@ -481,6 +802,8 @@ class coverage:
dest.write('! ') dest.write('! ')
else: else:
dest.write('> ') dest.write('> ')
elif lineno in excluded:
dest.write('- ')
elif covered: elif covered:
dest.write('> ') dest.write('> ')
else: else:
@@ -488,22 +811,23 @@ class coverage:
dest.write(line) dest.write(line)
source.close() source.close()
dest.close() dest.close()
except KeyboardInterrupt:
raise
except:
if not ignore_errors:
raise
# Singleton object. # Singleton object.
the_coverage = coverage() the_coverage = coverage()
# Module functions call methods in the singleton object. # Module functions call methods in the singleton object.
def start(*args): return apply(the_coverage.start, args) def use_cache(*args, **kw): return the_coverage.use_cache(*args, **kw)
def stop(*args): return apply(the_coverage.stop, args) def start(*args, **kw): return the_coverage.start(*args, **kw)
def erase(*args): return apply(the_coverage.erase, args) def stop(*args, **kw): return the_coverage.stop(*args, **kw)
def analysis(*args): return apply(the_coverage.analysis, args) def erase(*args, **kw): return the_coverage.erase(*args, **kw)
def report(*args): return apply(the_coverage.report, args) def begin_recursive(*args, **kw): return the_coverage.begin_recursive(*args, **kw)
def end_recursive(*args, **kw): return the_coverage.end_recursive(*args, **kw)
def exclude(*args, **kw): return the_coverage.exclude(*args, **kw)
def analysis(*args, **kw): return the_coverage.analysis(*args, **kw)
def analysis2(*args, **kw): return the_coverage.analysis2(*args, **kw)
def report(*args, **kw): return the_coverage.report(*args, **kw)
def annotate(*args, **kw): return the_coverage.annotate(*args, **kw)
def annotate_file(*args, **kw): return the_coverage.annotate_file(*args, **kw)
# Save coverage data when Python exits. (The atexit module wasn't # Save coverage data when Python exits. (The atexit module wasn't
# introduced until Python 2.0, so use sys.exitfunc when it's not # introduced until Python 2.0, so use sys.exitfunc when it's not
@@ -516,18 +840,18 @@ except ImportError:
# Command-line interface. # Command-line interface.
if __name__ == '__main__': if __name__ == '__main__':
the_coverage.command_line() the_coverage.command_line(sys.argv[1:])
# A. REFERENCES # A. REFERENCES
# #
# [GDR 2001-12-04a] "Statement coverage for Python"; Gareth Rees; # [GDR 2001-12-04a] "Statement coverage for Python"; Gareth Rees;
# Ravenbrook Limited; 2001-12-04; # Ravenbrook Limited; 2001-12-04;
# <http://www.garethrees.org/2001/12/04/python-coverage/>. # <http://www.nedbatchelder.com/code/modules/rees-coverage.html>.
# #
# [GDR 2001-12-04b] "Statement coverage for Python: design and # [GDR 2001-12-04b] "Statement coverage for Python: design and
# analysis"; Gareth Rees; Ravenbrook Limited; 2001-12-04; # analysis"; Gareth Rees; Ravenbrook Limited; 2001-12-04;
# <http://www.garethrees.org/2001/12/04/python-coverage/design.html>. # <http://www.nedbatchelder.com/code/modules/rees-design.html>.
# #
# [van Rossum 2001-07-20a] "Python Reference Manual (releae 2.1.1)"; # [van Rossum 2001-07-20a] "Python Reference Manual (releae 2.1.1)";
# Guide van Rossum; 2001-07-20; # Guide van Rossum; 2001-07-20;
@@ -561,10 +885,44 @@ if __name__ == '__main__':
# so that it matches the value the program would get if it were run on # so that it matches the value the program would get if it were run on
# its own. # its own.
# #
# 2004-12-12 NMB Significant code changes.
# - Finding executable statements has been rewritten so that docstrings and
# other quirks of Python execution aren't mistakenly identified as missing
# lines.
# - Lines can be excluded from consideration, even entire suites of lines.
# - The filesystem cache of covered lines can be disabled programmatically.
# - Modernized the code.
# #
# 2004-12-14 NMB Minor tweaks. Return 'analysis' to its original behavior
# and add 'analysis2'. Add a global for 'annotate', and factor it, adding
# 'annotate_file'.
#
# 2004-12-31 NMB Allow for keyword arguments in the module global functions.
# Thanks, Allen.
#
# 2005-12-02 NMB Call threading.settrace so that all threads are measured.
# Thanks Martin Fuzzey. Add a file argument to report so that reports can be
# captured to a different destination.
#
# 2005-12-03 NMB coverage.py can now measure itself.
#
# 2005-12-04 NMB Adapted Greg Rogers' patch for using relative filenames,
# and sorting and omitting files to report on.
#
# 2006-07-23 NMB Applied Joseph Tate's patch for function decorators.
#
# 2006-08-21 NMB Applied Sigve Tjora and Mark van der Wal's fixes for argument
# handling.
#
# 2006-08-22 NMB Applied Geoff Bache's parallel mode patch.
#
# 2006-08-23 NMB Refactorings to improve testability. Fixes to command-line
# logic for parallel mode and collect.
# C. COPYRIGHT AND LICENCE # C. COPYRIGHT AND LICENCE
# #
# Copyright 2001 Gareth Rees. All rights reserved. # Copyright 2001 Gareth Rees. All rights reserved.
# Copyright 2004-2006 Ned Batchelder. All rights reserved.
# #
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are # modification, are permitted provided that the following conditions are
@@ -591,6 +949,4 @@ if __name__ == '__main__':
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE. # DAMAGE.
# #
# # $Id: coverage.py 47 2006-08-24 01:08:48Z Ned $
#
# $Id: coverage.py,v 1.1 2005/07/11 20:39:31 dax Exp $