michael@0: #!/usr/bin/env python michael@0: michael@0: # This Source Code Form is subject to the terms of the Mozilla Public michael@0: # License, v. 2.0. If a copy of the MPL was not distributed with this file, michael@0: # You can obtain one at http://mozilla.org/MPL/2.0/. michael@0: michael@0: """ michael@0: Mozilla universal manifest parser michael@0: """ michael@0: michael@0: __all__ = ['read_ini', # .ini reader michael@0: 'ManifestParser', 'TestManifest', 'convert', # manifest handling michael@0: 'parse', 'ParseError', 'ExpressionParser'] # conditional expression parser michael@0: michael@0: import fnmatch michael@0: import os michael@0: import re michael@0: import shutil michael@0: import sys michael@0: michael@0: from optparse import OptionParser michael@0: from StringIO import StringIO michael@0: michael@0: relpath = os.path.relpath michael@0: string = (basestring,) michael@0: michael@0: michael@0: # expr.py michael@0: # from: michael@0: # http://k0s.org/mozilla/hg/expressionparser michael@0: # http://hg.mozilla.org/users/tmielczarek_mozilla.com/expressionparser michael@0: michael@0: # Implements a top-down parser/evaluator for simple boolean expressions. michael@0: # ideas taken from http://effbot.org/zone/simple-top-down-parsing.htm michael@0: # michael@0: # Rough grammar: michael@0: # expr := literal michael@0: # | '(' expr ')' michael@0: # | expr '&&' expr michael@0: # | expr '||' expr michael@0: # | expr '==' expr michael@0: # | expr '!=' expr michael@0: # literal := BOOL michael@0: # | INT michael@0: # | STRING michael@0: # | IDENT michael@0: # BOOL := true|false michael@0: # INT := [0-9]+ michael@0: # STRING := "[^"]*" michael@0: # IDENT := [A-Za-z_]\w* michael@0: michael@0: # Identifiers take their values from a mapping dictionary passed as the second michael@0: # argument. michael@0: michael@0: # Glossary (see above URL for details): michael@0: # - nud: null denotation michael@0: # - led: left detonation michael@0: # - lbp: left binding power michael@0: # - rbp: right binding power michael@0: michael@0: class ident_token(object): michael@0: def __init__(self, value): michael@0: self.value = value michael@0: def nud(self, parser): michael@0: # identifiers take their value from the value mappings passed michael@0: # to the parser michael@0: return parser.value(self.value) michael@0: michael@0: class literal_token(object): michael@0: def __init__(self, value): michael@0: self.value = value michael@0: def nud(self, parser): michael@0: return self.value michael@0: michael@0: class eq_op_token(object): michael@0: "==" michael@0: def led(self, parser, left): michael@0: return left == parser.expression(self.lbp) michael@0: michael@0: class neq_op_token(object): michael@0: "!=" michael@0: def led(self, parser, left): michael@0: return left != parser.expression(self.lbp) michael@0: michael@0: class not_op_token(object): michael@0: "!" michael@0: def nud(self, parser): michael@0: return not parser.expression(100) michael@0: michael@0: class and_op_token(object): michael@0: "&&" michael@0: def led(self, parser, left): michael@0: right = parser.expression(self.lbp) michael@0: return left and right michael@0: michael@0: class or_op_token(object): michael@0: "||" michael@0: def led(self, parser, left): michael@0: right = parser.expression(self.lbp) michael@0: return left or right michael@0: michael@0: class lparen_token(object): michael@0: "(" michael@0: def nud(self, parser): michael@0: expr = parser.expression() michael@0: parser.advance(rparen_token) michael@0: return expr michael@0: michael@0: class rparen_token(object): michael@0: ")" michael@0: michael@0: class end_token(object): michael@0: """always ends parsing""" michael@0: michael@0: ### derived literal tokens michael@0: michael@0: class bool_token(literal_token): michael@0: def __init__(self, value): michael@0: value = {'true':True, 'false':False}[value] michael@0: literal_token.__init__(self, value) michael@0: michael@0: class int_token(literal_token): michael@0: def __init__(self, value): michael@0: literal_token.__init__(self, int(value)) michael@0: michael@0: class string_token(literal_token): michael@0: def __init__(self, value): michael@0: literal_token.__init__(self, value[1:-1]) michael@0: michael@0: precedence = [(end_token, rparen_token), michael@0: (or_op_token,), michael@0: (and_op_token,), michael@0: (eq_op_token, neq_op_token), michael@0: (lparen_token,), michael@0: ] michael@0: for index, rank in enumerate(precedence): michael@0: for token in rank: michael@0: token.lbp = index # lbp = lowest left binding power michael@0: michael@0: class ParseError(Exception): michael@0: """error parsing conditional expression""" michael@0: michael@0: class ExpressionParser(object): michael@0: """ michael@0: A parser for a simple expression language. michael@0: michael@0: The expression language can be described as follows:: michael@0: michael@0: EXPRESSION ::= LITERAL | '(' EXPRESSION ')' | '!' EXPRESSION | EXPRESSION OP EXPRESSION michael@0: OP ::= '==' | '!=' | '&&' | '||' michael@0: LITERAL ::= BOOL | INT | IDENT | STRING michael@0: BOOL ::= 'true' | 'false' michael@0: INT ::= [0-9]+ michael@0: IDENT ::= [a-zA-Z_]\w* michael@0: STRING ::= '"' [^\"] '"' | ''' [^\'] ''' michael@0: michael@0: At its core, expressions consist of booleans, integers, identifiers and. michael@0: strings. Booleans are one of *true* or *false*. Integers are a series michael@0: of digits. Identifiers are a series of English letters and underscores. michael@0: Strings are a pair of matching quote characters (single or double) with michael@0: zero or more characters inside. michael@0: michael@0: Expressions can be combined with operators: the equals (==) and not michael@0: equals (!=) operators compare two expressions and produce a boolean. The michael@0: and (&&) and or (||) operators take two expressions and produce the logical michael@0: AND or OR value of them, respectively. An expression can also be prefixed michael@0: with the not (!) operator, which produces its logical negation. michael@0: michael@0: Finally, any expression may be contained within parentheses for grouping. michael@0: michael@0: Identifiers take their values from the mapping provided. michael@0: """ michael@0: def __init__(self, text, valuemapping, strict=False): michael@0: """ michael@0: Initialize the parser michael@0: :param text: The expression to parse as a string. michael@0: :param valuemapping: A dict mapping identifier names to values. michael@0: :param strict: If true, referencing an identifier that was not michael@0: provided in :valuemapping: will raise an error. michael@0: """ michael@0: self.text = text michael@0: self.valuemapping = valuemapping michael@0: self.strict = strict michael@0: michael@0: def _tokenize(self): michael@0: """ michael@0: Lex the input text into tokens and yield them in sequence. michael@0: """ michael@0: # scanner callbacks michael@0: def bool_(scanner, t): return bool_token(t) michael@0: def identifier(scanner, t): return ident_token(t) michael@0: def integer(scanner, t): return int_token(t) michael@0: def eq(scanner, t): return eq_op_token() michael@0: def neq(scanner, t): return neq_op_token() michael@0: def or_(scanner, t): return or_op_token() michael@0: def and_(scanner, t): return and_op_token() michael@0: def lparen(scanner, t): return lparen_token() michael@0: def rparen(scanner, t): return rparen_token() michael@0: def string_(scanner, t): return string_token(t) michael@0: def not_(scanner, t): return not_op_token() michael@0: michael@0: scanner = re.Scanner([ michael@0: # Note: keep these in sync with the class docstring above. michael@0: (r"true|false", bool_), michael@0: (r"[a-zA-Z_]\w*", identifier), michael@0: (r"[0-9]+", integer), michael@0: (r'("[^"]*")|(\'[^\']*\')', string_), michael@0: (r"==", eq), michael@0: (r"!=", neq), michael@0: (r"\|\|", or_), michael@0: (r"!", not_), michael@0: (r"&&", and_), michael@0: (r"\(", lparen), michael@0: (r"\)", rparen), michael@0: (r"\s+", None), # skip whitespace michael@0: ]) michael@0: tokens, remainder = scanner.scan(self.text) michael@0: for t in tokens: michael@0: yield t michael@0: yield end_token() michael@0: michael@0: def value(self, ident): michael@0: """ michael@0: Look up the value of |ident| in the value mapping passed in the michael@0: constructor. michael@0: """ michael@0: if self.strict: michael@0: return self.valuemapping[ident] michael@0: else: michael@0: return self.valuemapping.get(ident, None) michael@0: michael@0: def advance(self, expected): michael@0: """ michael@0: Assert that the next token is an instance of |expected|, and advance michael@0: to the next token. michael@0: """ michael@0: if not isinstance(self.token, expected): michael@0: raise Exception, "Unexpected token!" michael@0: self.token = self.iter.next() michael@0: michael@0: def expression(self, rbp=0): michael@0: """ michael@0: Parse and return the value of an expression until a token with michael@0: right binding power greater than rbp is encountered. michael@0: """ michael@0: t = self.token michael@0: self.token = self.iter.next() michael@0: left = t.nud(self) michael@0: while rbp < self.token.lbp: michael@0: t = self.token michael@0: self.token = self.iter.next() michael@0: left = t.led(self, left) michael@0: return left michael@0: michael@0: def parse(self): michael@0: """ michael@0: Parse and return the value of the expression in the text michael@0: passed to the constructor. Raises a ParseError if the expression michael@0: could not be parsed. michael@0: """ michael@0: try: michael@0: self.iter = self._tokenize() michael@0: self.token = self.iter.next() michael@0: return self.expression() michael@0: except: michael@0: raise ParseError("could not parse: %s; variables: %s" % (self.text, self.valuemapping)) michael@0: michael@0: __call__ = parse michael@0: michael@0: def parse(text, **values): michael@0: """ michael@0: Parse and evaluate a boolean expression. michael@0: :param text: The expression to parse, as a string. michael@0: :param values: A dict containing a name to value mapping for identifiers michael@0: referenced in *text*. michael@0: :rtype: the final value of the expression. michael@0: :raises: :py:exc::ParseError: will be raised if parsing fails. michael@0: """ michael@0: return ExpressionParser(text, values).parse() michael@0: michael@0: michael@0: ### path normalization michael@0: michael@0: def normalize_path(path): michael@0: """normalize a relative path""" michael@0: if sys.platform.startswith('win'): michael@0: return path.replace('/', os.path.sep) michael@0: return path michael@0: michael@0: def denormalize_path(path): michael@0: """denormalize a relative path""" michael@0: if sys.platform.startswith('win'): michael@0: return path.replace(os.path.sep, '/') michael@0: return path michael@0: michael@0: michael@0: ### .ini reader michael@0: michael@0: def read_ini(fp, variables=None, default='DEFAULT', michael@0: comments=';#', separators=('=', ':'), michael@0: strict=True): michael@0: """ michael@0: read an .ini file and return a list of [(section, values)] michael@0: - fp : file pointer or path to read michael@0: - variables : default set of variables michael@0: - default : name of the section for the default section michael@0: - comments : characters that if they start a line denote a comment michael@0: - separators : strings that denote key, value separation in order michael@0: - strict : whether to be strict about parsing michael@0: """ michael@0: michael@0: # variables michael@0: variables = variables or {} michael@0: sections = [] michael@0: key = value = None michael@0: section_names = set() michael@0: if isinstance(fp, basestring): michael@0: fp = file(fp) michael@0: michael@0: # read the lines michael@0: for (linenum, line) in enumerate(fp.readlines(), start=1): michael@0: michael@0: stripped = line.strip() michael@0: michael@0: # ignore blank lines michael@0: if not stripped: michael@0: # reset key and value to avoid continuation lines michael@0: key = value = None michael@0: continue michael@0: michael@0: # ignore comment lines michael@0: if stripped[0] in comments: michael@0: continue michael@0: michael@0: # check for a new section michael@0: if len(stripped) > 2 and stripped[0] == '[' and stripped[-1] == ']': michael@0: section = stripped[1:-1].strip() michael@0: key = value = None michael@0: michael@0: # deal with DEFAULT section michael@0: if section.lower() == default.lower(): michael@0: if strict: michael@0: assert default not in section_names michael@0: section_names.add(default) michael@0: current_section = variables michael@0: continue michael@0: michael@0: if strict: michael@0: # make sure this section doesn't already exist michael@0: assert section not in section_names, "Section '%s' already found in '%s'" % (section, section_names) michael@0: michael@0: section_names.add(section) michael@0: current_section = {} michael@0: sections.append((section, current_section)) michael@0: continue michael@0: michael@0: # if there aren't any sections yet, something bad happen michael@0: if not section_names: michael@0: raise Exception('No sections found') michael@0: michael@0: # (key, value) pair michael@0: for separator in separators: michael@0: if separator in stripped: michael@0: key, value = stripped.split(separator, 1) michael@0: key = key.strip() michael@0: value = value.strip() michael@0: michael@0: if strict: michael@0: # make sure this key isn't already in the section or empty michael@0: assert key michael@0: if current_section is not variables: michael@0: assert key not in current_section michael@0: michael@0: current_section[key] = value michael@0: break michael@0: else: michael@0: # continuation line ? michael@0: if line[0].isspace() and key: michael@0: value = '%s%s%s' % (value, os.linesep, stripped) michael@0: current_section[key] = value michael@0: else: michael@0: # something bad happened! michael@0: if hasattr(fp, 'name'): michael@0: filename = fp.name michael@0: else: michael@0: filename = 'unknown' michael@0: raise Exception("Error parsing manifest file '%s', line %s" % michael@0: (filename, linenum)) michael@0: michael@0: # interpret the variables michael@0: def interpret_variables(global_dict, local_dict): michael@0: variables = global_dict.copy() michael@0: if 'skip-if' in local_dict and 'skip-if' in variables: michael@0: local_dict['skip-if'] = "(%s) || (%s)" % (variables['skip-if'].split('#')[0], local_dict['skip-if'].split('#')[0]) michael@0: variables.update(local_dict) michael@0: michael@0: return variables michael@0: michael@0: sections = [(i, interpret_variables(variables, j)) for i, j in sections] michael@0: return sections michael@0: michael@0: michael@0: ### objects for parsing manifests michael@0: michael@0: class ManifestParser(object): michael@0: """read .ini manifests""" michael@0: michael@0: def __init__(self, manifests=(), defaults=None, strict=True): michael@0: self._defaults = defaults or {} michael@0: self.tests = [] michael@0: self.manifest_defaults = {} michael@0: self.strict = strict michael@0: self.rootdir = None michael@0: self.relativeRoot = None michael@0: if manifests: michael@0: self.read(*manifests) michael@0: michael@0: def getRelativeRoot(self, root): michael@0: return root michael@0: michael@0: ### methods for reading manifests michael@0: michael@0: def _read(self, root, filename, defaults): michael@0: michael@0: # get directory of this file if not file-like object michael@0: if isinstance(filename, string): michael@0: filename = os.path.abspath(filename) michael@0: fp = open(filename) michael@0: here = os.path.dirname(filename) michael@0: else: michael@0: fp = filename michael@0: filename = here = None michael@0: defaults['here'] = here michael@0: michael@0: # Rootdir is needed for relative path calculation. Precompute it for michael@0: # the microoptimization used below. michael@0: if self.rootdir is None: michael@0: rootdir = "" michael@0: else: michael@0: assert os.path.isabs(self.rootdir) michael@0: rootdir = self.rootdir + os.path.sep michael@0: michael@0: # read the configuration michael@0: sections = read_ini(fp=fp, variables=defaults, strict=self.strict) michael@0: self.manifest_defaults[filename] = defaults michael@0: michael@0: # get the tests michael@0: for section, data in sections: michael@0: subsuite = '' michael@0: if 'subsuite' in data: michael@0: subsuite = data['subsuite'] michael@0: michael@0: # a file to include michael@0: # TODO: keep track of included file structure: michael@0: # self.manifests = {'manifest.ini': 'relative/path.ini'} michael@0: if section.startswith('include:'): michael@0: include_file = section.split('include:', 1)[-1] michael@0: include_file = normalize_path(include_file) michael@0: if not os.path.isabs(include_file): michael@0: include_file = os.path.join(self.getRelativeRoot(here), include_file) michael@0: if not os.path.exists(include_file): michael@0: message = "Included file '%s' does not exist" % include_file michael@0: if self.strict: michael@0: raise IOError(message) michael@0: else: michael@0: sys.stderr.write("%s\n" % message) michael@0: continue michael@0: include_defaults = data.copy() michael@0: self._read(root, include_file, include_defaults) michael@0: continue michael@0: michael@0: # otherwise an item michael@0: test = data michael@0: test['name'] = section michael@0: michael@0: # Will be None if the manifest being read is a file-like object. michael@0: test['manifest'] = filename michael@0: michael@0: # determine the path michael@0: path = test.get('path', section) michael@0: _relpath = path michael@0: if '://' not in path: # don't futz with URLs michael@0: path = normalize_path(path) michael@0: if here and not os.path.isabs(path): michael@0: path = os.path.normpath(os.path.join(here, path)) michael@0: michael@0: # Microoptimization, because relpath is quite expensive. michael@0: # We know that rootdir is an absolute path or empty. If path michael@0: # starts with rootdir, then path is also absolute and the tail michael@0: # of the path is the relative path (possibly non-normalized, michael@0: # when here is unknown). michael@0: # For this to work rootdir needs to be terminated with a path michael@0: # separator, so that references to sibling directories with michael@0: # a common prefix don't get misscomputed (e.g. /root and michael@0: # /rootbeer/file). michael@0: # When the rootdir is unknown, the relpath needs to be left michael@0: # unchanged. We use an empty string as rootdir in that case, michael@0: # which leaves relpath unchanged after slicing. michael@0: if path.startswith(rootdir): michael@0: _relpath = path[len(rootdir):] michael@0: else: michael@0: _relpath = relpath(path, rootdir) michael@0: michael@0: test['subsuite'] = subsuite michael@0: test['path'] = path michael@0: test['relpath'] = _relpath michael@0: michael@0: # append the item michael@0: self.tests.append(test) michael@0: michael@0: def read(self, *filenames, **defaults): michael@0: """ michael@0: read and add manifests from file paths or file-like objects michael@0: michael@0: filenames -- file paths or file-like objects to read as manifests michael@0: defaults -- default variables michael@0: """ michael@0: michael@0: # ensure all files exist michael@0: missing = [filename for filename in filenames michael@0: if isinstance(filename, string) and not os.path.exists(filename) ] michael@0: if missing: michael@0: raise IOError('Missing files: %s' % ', '.join(missing)) michael@0: michael@0: # default variables michael@0: _defaults = defaults.copy() or self._defaults.copy() michael@0: _defaults.setdefault('here', None) michael@0: michael@0: # process each file michael@0: for filename in filenames: michael@0: # set the per file defaults michael@0: defaults = _defaults.copy() michael@0: here = None michael@0: if isinstance(filename, string): michael@0: here = os.path.dirname(os.path.abspath(filename)) michael@0: defaults['here'] = here # directory of master .ini file michael@0: michael@0: if self.rootdir is None: michael@0: # set the root directory michael@0: # == the directory of the first manifest given michael@0: self.rootdir = here michael@0: michael@0: self._read(here, filename, defaults) michael@0: michael@0: michael@0: ### methods for querying manifests michael@0: michael@0: def query(self, *checks, **kw): michael@0: """ michael@0: general query function for tests michael@0: - checks : callable conditions to test if the test fulfills the query michael@0: """ michael@0: tests = kw.get('tests', None) michael@0: if tests is None: michael@0: tests = self.tests michael@0: retval = [] michael@0: for test in tests: michael@0: for check in checks: michael@0: if not check(test): michael@0: break michael@0: else: michael@0: retval.append(test) michael@0: return retval michael@0: michael@0: def get(self, _key=None, inverse=False, tags=None, tests=None, **kwargs): michael@0: # TODO: pass a dict instead of kwargs since you might hav michael@0: # e.g. 'inverse' as a key in the dict michael@0: michael@0: # TODO: tags should just be part of kwargs with None values michael@0: # (None == any is kinda weird, but probably still better) michael@0: michael@0: # fix up tags michael@0: if tags: michael@0: tags = set(tags) michael@0: else: michael@0: tags = set() michael@0: michael@0: # make some check functions michael@0: if inverse: michael@0: has_tags = lambda test: not tags.intersection(test.keys()) michael@0: def dict_query(test): michael@0: for key, value in kwargs.items(): michael@0: if test.get(key) == value: michael@0: return False michael@0: return True michael@0: else: michael@0: has_tags = lambda test: tags.issubset(test.keys()) michael@0: def dict_query(test): michael@0: for key, value in kwargs.items(): michael@0: if test.get(key) != value: michael@0: return False michael@0: return True michael@0: michael@0: # query the tests michael@0: tests = self.query(has_tags, dict_query, tests=tests) michael@0: michael@0: # if a key is given, return only a list of that key michael@0: # useful for keys like 'name' or 'path' michael@0: if _key: michael@0: return [test[_key] for test in tests] michael@0: michael@0: # return the tests michael@0: return tests michael@0: michael@0: def manifests(self, tests=None): michael@0: """ michael@0: return manifests in order in which they appear in the tests michael@0: """ michael@0: if tests is None: michael@0: # Make sure to return all the manifests, even ones without tests. michael@0: return self.manifest_defaults.keys() michael@0: michael@0: manifests = [] michael@0: for test in tests: michael@0: manifest = test.get('manifest') michael@0: if not manifest: michael@0: continue michael@0: if manifest not in manifests: michael@0: manifests.append(manifest) michael@0: return manifests michael@0: michael@0: def paths(self): michael@0: return [i['path'] for i in self.tests] michael@0: michael@0: michael@0: ### methods for auditing michael@0: michael@0: def missing(self, tests=None): michael@0: """return list of tests that do not exist on the filesystem""" michael@0: if tests is None: michael@0: tests = self.tests michael@0: return [test for test in tests michael@0: if not os.path.exists(test['path'])] michael@0: michael@0: def verifyDirectory(self, directories, pattern=None, extensions=None): michael@0: """ michael@0: checks what is on the filesystem vs what is in a manifest michael@0: returns a 2-tuple of sets: michael@0: (missing_from_filesystem, missing_from_manifest) michael@0: """ michael@0: michael@0: files = set([]) michael@0: if isinstance(directories, basestring): michael@0: directories = [directories] michael@0: michael@0: # get files in directories michael@0: for directory in directories: michael@0: for dirpath, dirnames, filenames in os.walk(directory, topdown=True): michael@0: michael@0: # only add files that match a pattern michael@0: if pattern: michael@0: filenames = fnmatch.filter(filenames, pattern) michael@0: michael@0: # only add files that have one of the extensions michael@0: if extensions: michael@0: filenames = [filename for filename in filenames michael@0: if os.path.splitext(filename)[-1] in extensions] michael@0: michael@0: files.update([os.path.join(dirpath, filename) for filename in filenames]) michael@0: michael@0: paths = set(self.paths()) michael@0: missing_from_filesystem = paths.difference(files) michael@0: missing_from_manifest = files.difference(paths) michael@0: return (missing_from_filesystem, missing_from_manifest) michael@0: michael@0: michael@0: ### methods for output michael@0: michael@0: def write(self, fp=sys.stdout, rootdir=None, michael@0: global_tags=None, global_kwargs=None, michael@0: local_tags=None, local_kwargs=None): michael@0: """ michael@0: write a manifest given a query michael@0: global and local options will be munged to do the query michael@0: globals will be written to the top of the file michael@0: locals (if given) will be written per test michael@0: """ michael@0: michael@0: # open file if `fp` given as string michael@0: close = False michael@0: if isinstance(fp, string): michael@0: fp = file(fp, 'w') michael@0: close = True michael@0: michael@0: # root directory michael@0: if rootdir is None: michael@0: rootdir = self.rootdir michael@0: michael@0: # sanitize input michael@0: global_tags = global_tags or set() michael@0: local_tags = local_tags or set() michael@0: global_kwargs = global_kwargs or {} michael@0: local_kwargs = local_kwargs or {} michael@0: michael@0: # create the query michael@0: tags = set([]) michael@0: tags.update(global_tags) michael@0: tags.update(local_tags) michael@0: kwargs = {} michael@0: kwargs.update(global_kwargs) michael@0: kwargs.update(local_kwargs) michael@0: michael@0: # get matching tests michael@0: tests = self.get(tags=tags, **kwargs) michael@0: michael@0: # print the .ini manifest michael@0: if global_tags or global_kwargs: michael@0: print >> fp, '[DEFAULT]' michael@0: for tag in global_tags: michael@0: print >> fp, '%s =' % tag michael@0: for key, value in global_kwargs.items(): michael@0: print >> fp, '%s = %s' % (key, value) michael@0: print >> fp michael@0: michael@0: for test in tests: michael@0: test = test.copy() # don't overwrite michael@0: michael@0: path = test['name'] michael@0: if not os.path.isabs(path): michael@0: path = test['path'] michael@0: if self.rootdir: michael@0: path = relpath(test['path'], self.rootdir) michael@0: path = denormalize_path(path) michael@0: print >> fp, '[%s]' % path michael@0: michael@0: # reserved keywords: michael@0: reserved = ['path', 'name', 'here', 'manifest', 'relpath'] michael@0: for key in sorted(test.keys()): michael@0: if key in reserved: michael@0: continue michael@0: if key in global_kwargs: michael@0: continue michael@0: if key in global_tags and not test[key]: michael@0: continue michael@0: print >> fp, '%s = %s' % (key, test[key]) michael@0: print >> fp michael@0: michael@0: if close: michael@0: # close the created file michael@0: fp.close() michael@0: michael@0: def __str__(self): michael@0: fp = StringIO() michael@0: self.write(fp=fp) michael@0: value = fp.getvalue() michael@0: return value michael@0: michael@0: def copy(self, directory, rootdir=None, *tags, **kwargs): michael@0: """ michael@0: copy the manifests and associated tests michael@0: - directory : directory to copy to michael@0: - rootdir : root directory to copy to (if not given from manifests) michael@0: - tags : keywords the tests must have michael@0: - kwargs : key, values the tests must match michael@0: """ michael@0: # XXX note that copy does *not* filter the tests out of the michael@0: # resulting manifest; it just stupidly copies them over. michael@0: # ideally, it would reread the manifests and filter out the michael@0: # tests that don't match *tags and **kwargs michael@0: michael@0: # destination michael@0: if not os.path.exists(directory): michael@0: os.path.makedirs(directory) michael@0: else: michael@0: # sanity check michael@0: assert os.path.isdir(directory) michael@0: michael@0: # tests to copy michael@0: tests = self.get(tags=tags, **kwargs) michael@0: if not tests: michael@0: return # nothing to do! michael@0: michael@0: # root directory michael@0: if rootdir is None: michael@0: rootdir = self.rootdir michael@0: michael@0: # copy the manifests + tests michael@0: manifests = [relpath(manifest, rootdir) for manifest in self.manifests()] michael@0: for manifest in manifests: michael@0: destination = os.path.join(directory, manifest) michael@0: dirname = os.path.dirname(destination) michael@0: if not os.path.exists(dirname): michael@0: os.makedirs(dirname) michael@0: else: michael@0: # sanity check michael@0: assert os.path.isdir(dirname) michael@0: shutil.copy(os.path.join(rootdir, manifest), destination) michael@0: for test in tests: michael@0: if os.path.isabs(test['name']): michael@0: continue michael@0: source = test['path'] michael@0: if not os.path.exists(source): michael@0: print >> sys.stderr, "Missing test: '%s' does not exist!" % source michael@0: continue michael@0: # TODO: should err on strict michael@0: destination = os.path.join(directory, relpath(test['path'], rootdir)) michael@0: shutil.copy(source, destination) michael@0: # TODO: ensure that all of the tests are below the from_dir michael@0: michael@0: def update(self, from_dir, rootdir=None, *tags, **kwargs): michael@0: """ michael@0: update the tests as listed in a manifest from a directory michael@0: - from_dir : directory where the tests live michael@0: - rootdir : root directory to copy to (if not given from manifests) michael@0: - tags : keys the tests must have michael@0: - kwargs : key, values the tests must match michael@0: """ michael@0: michael@0: # get the tests michael@0: tests = self.get(tags=tags, **kwargs) michael@0: michael@0: # get the root directory michael@0: if not rootdir: michael@0: rootdir = self.rootdir michael@0: michael@0: # copy them! michael@0: for test in tests: michael@0: if not os.path.isabs(test['name']): michael@0: _relpath = relpath(test['path'], rootdir) michael@0: source = os.path.join(from_dir, _relpath) michael@0: if not os.path.exists(source): michael@0: # TODO err on strict michael@0: print >> sys.stderr, "Missing test: '%s'; skipping" % test['name'] michael@0: continue michael@0: destination = os.path.join(rootdir, _relpath) michael@0: shutil.copy(source, destination) michael@0: michael@0: ### directory importers michael@0: michael@0: @classmethod michael@0: def _walk_directories(cls, directories, function, pattern=None, ignore=()): michael@0: """ michael@0: internal function to import directories michael@0: """ michael@0: michael@0: class FilteredDirectoryContents(object): michael@0: """class to filter directory contents""" michael@0: michael@0: sort = sorted michael@0: michael@0: def __init__(self, pattern=pattern, ignore=ignore, cache=None): michael@0: if pattern is None: michael@0: pattern = set() michael@0: if isinstance(pattern, basestring): michael@0: pattern = [pattern] michael@0: self.patterns = pattern michael@0: self.ignore = set(ignore) michael@0: michael@0: # cache of (dirnames, filenames) keyed on directory real path michael@0: # assumes volume is frozen throughout scope michael@0: self._cache = cache or {} michael@0: michael@0: def __call__(self, directory): michael@0: """returns 2-tuple: dirnames, filenames""" michael@0: directory = os.path.realpath(directory) michael@0: if directory not in self._cache: michael@0: dirnames, filenames = self.contents(directory) michael@0: michael@0: # filter out directories without progeny michael@0: # XXX recursive: should keep track of seen directories michael@0: dirnames = [ dirname for dirname in dirnames michael@0: if not self.empty(os.path.join(directory, dirname)) ] michael@0: michael@0: self._cache[directory] = (tuple(dirnames), filenames) michael@0: michael@0: # return cached values michael@0: return self._cache[directory] michael@0: michael@0: def empty(self, directory): michael@0: """ michael@0: returns if a directory and its descendents are empty michael@0: """ michael@0: return self(directory) == ((), ()) michael@0: michael@0: def contents(self, directory, sort=None): michael@0: """ michael@0: return directory contents as (dirnames, filenames) michael@0: with `ignore` and `pattern` applied michael@0: """ michael@0: michael@0: if sort is None: michael@0: sort = self.sort michael@0: michael@0: # split directories and files michael@0: dirnames = [] michael@0: filenames = [] michael@0: for item in os.listdir(directory): michael@0: path = os.path.join(directory, item) michael@0: if os.path.isdir(path): michael@0: dirnames.append(item) michael@0: else: michael@0: # XXX not sure what to do if neither a file or directory michael@0: # (if anything) michael@0: assert os.path.isfile(path) michael@0: filenames.append(item) michael@0: michael@0: # filter contents; michael@0: # this could be done in situ re the above for loop michael@0: # but it is really disparate in intent michael@0: # and could conceivably go to a separate method michael@0: dirnames = [dirname for dirname in dirnames michael@0: if dirname not in self.ignore] michael@0: filenames = set(filenames) michael@0: # we use set functionality to filter filenames michael@0: if self.patterns: michael@0: matches = set() michael@0: matches.update(*[fnmatch.filter(filenames, pattern) michael@0: for pattern in self.patterns]) michael@0: filenames = matches michael@0: michael@0: if sort is not None: michael@0: # sort dirnames, filenames michael@0: dirnames = sort(dirnames) michael@0: filenames = sort(filenames) michael@0: michael@0: return (tuple(dirnames), tuple(filenames)) michael@0: michael@0: # make a filtered directory object michael@0: directory_contents = FilteredDirectoryContents(pattern=pattern, ignore=ignore) michael@0: michael@0: # walk the directories, generating manifests michael@0: for index, directory in enumerate(directories): michael@0: michael@0: for dirpath, dirnames, filenames in os.walk(directory): michael@0: michael@0: # get the directory contents from the caching object michael@0: _dirnames, filenames = directory_contents(dirpath) michael@0: # filter out directory names michael@0: dirnames[:] = _dirnames michael@0: michael@0: # call callback function michael@0: function(directory, dirpath, dirnames, filenames) michael@0: michael@0: @classmethod michael@0: def populate_directory_manifests(cls, directories, filename, pattern=None, ignore=(), overwrite=False): michael@0: """ michael@0: walks directories and writes manifests of name `filename` in-place; returns `cls` instance populated michael@0: with the given manifests michael@0: michael@0: filename -- filename of manifests to write michael@0: pattern -- shell pattern (glob) or patterns of filenames to match michael@0: ignore -- directory names to ignore michael@0: overwrite -- whether to overwrite existing files of given name michael@0: """ michael@0: michael@0: manifest_dict = {} michael@0: seen = [] # top-level directories seen michael@0: michael@0: if os.path.basename(filename) != filename: michael@0: raise IOError("filename should not include directory name") michael@0: michael@0: # no need to hit directories more than once michael@0: _directories = directories michael@0: directories = [] michael@0: for directory in _directories: michael@0: if directory not in directories: michael@0: directories.append(directory) michael@0: michael@0: def callback(directory, dirpath, dirnames, filenames): michael@0: """write a manifest for each directory""" michael@0: michael@0: manifest_path = os.path.join(dirpath, filename) michael@0: if (dirnames or filenames) and not (os.path.exists(manifest_path) and overwrite): michael@0: with file(manifest_path, 'w') as manifest: michael@0: for dirname in dirnames: michael@0: print >> manifest, '[include:%s]' % os.path.join(dirname, filename) michael@0: for _filename in filenames: michael@0: print >> manifest, '[%s]' % _filename michael@0: michael@0: # add to list of manifests michael@0: manifest_dict.setdefault(directory, manifest_path) michael@0: michael@0: # walk the directories to gather files michael@0: cls._walk_directories(directories, callback, pattern=pattern, ignore=ignore) michael@0: # get manifests michael@0: manifests = [manifest_dict[directory] for directory in _directories] michael@0: michael@0: # create a `cls` instance with the manifests michael@0: return cls(manifests=manifests) michael@0: michael@0: @classmethod michael@0: def from_directories(cls, directories, pattern=None, ignore=(), write=None, relative_to=None): michael@0: """ michael@0: convert directories to a simple manifest; returns ManifestParser instance michael@0: michael@0: pattern -- shell pattern (glob) or patterns of filenames to match michael@0: ignore -- directory names to ignore michael@0: write -- filename or file-like object of manifests to write; michael@0: if `None` then a StringIO instance will be created michael@0: relative_to -- write paths relative to this path; michael@0: if false then the paths are absolute michael@0: """ michael@0: michael@0: michael@0: # determine output michael@0: opened_manifest_file = None # name of opened manifest file michael@0: absolute = not relative_to # whether to output absolute path names as names michael@0: if isinstance(write, string): michael@0: opened_manifest_file = write michael@0: write = file(write, 'w') michael@0: if write is None: michael@0: write = StringIO() michael@0: michael@0: # walk the directories, generating manifests michael@0: def callback(directory, dirpath, dirnames, filenames): michael@0: michael@0: # absolute paths michael@0: filenames = [os.path.join(dirpath, filename) michael@0: for filename in filenames] michael@0: # ensure new manifest isn't added michael@0: filenames = [filename for filename in filenames michael@0: if filename != opened_manifest_file] michael@0: # normalize paths michael@0: if not absolute and relative_to: michael@0: filenames = [relpath(filename, relative_to) michael@0: for filename in filenames] michael@0: michael@0: # write to manifest michael@0: print >> write, '\n'.join(['[%s]' % denormalize_path(filename) michael@0: for filename in filenames]) michael@0: michael@0: michael@0: cls._walk_directories(directories, callback, pattern=pattern, ignore=ignore) michael@0: michael@0: if opened_manifest_file: michael@0: # close file michael@0: write.close() michael@0: manifests = [opened_manifest_file] michael@0: else: michael@0: # manifests/write is a file-like object; michael@0: # rewind buffer michael@0: write.flush() michael@0: write.seek(0) michael@0: manifests = [write] michael@0: michael@0: michael@0: # make a ManifestParser instance michael@0: return cls(manifests=manifests) michael@0: michael@0: convert = ManifestParser.from_directories michael@0: michael@0: michael@0: class TestManifest(ManifestParser): michael@0: """ michael@0: apply logic to manifests; this is your integration layer :) michael@0: specific harnesses may subclass from this if they need more logic michael@0: """ michael@0: michael@0: def filter(self, values, tests): michael@0: """ michael@0: filter on a specific list tag, e.g.: michael@0: run-if = os == win linux michael@0: skip-if = os == mac michael@0: """ michael@0: michael@0: # tags: michael@0: run_tag = 'run-if' michael@0: skip_tag = 'skip-if' michael@0: fail_tag = 'fail-if' michael@0: michael@0: # loop over test michael@0: for test in tests: michael@0: reason = None # reason to disable michael@0: michael@0: # tagged-values to run michael@0: if run_tag in test: michael@0: condition = test[run_tag] michael@0: if not parse(condition, **values): michael@0: reason = '%s: %s' % (run_tag, condition) michael@0: michael@0: # tagged-values to skip michael@0: if skip_tag in test: michael@0: condition = test[skip_tag] michael@0: if parse(condition, **values): michael@0: reason = '%s: %s' % (skip_tag, condition) michael@0: michael@0: # mark test as disabled if there's a reason michael@0: if reason: michael@0: test.setdefault('disabled', reason) michael@0: michael@0: # mark test as a fail if so indicated michael@0: if fail_tag in test: michael@0: condition = test[fail_tag] michael@0: if parse(condition, **values): michael@0: test['expected'] = 'fail' michael@0: michael@0: def active_tests(self, exists=True, disabled=True, options=None, **values): michael@0: """ michael@0: - exists : return only existing tests michael@0: - disabled : whether to return disabled tests michael@0: - tags : keys and values to filter on (e.g. `os = linux mac`) michael@0: """ michael@0: tests = [i.copy() for i in self.tests] # shallow copy michael@0: michael@0: # Filter on current subsuite michael@0: if options: michael@0: if options.subsuite: michael@0: tests = [test for test in tests if options.subsuite == test['subsuite']] michael@0: else: michael@0: tests = [test for test in tests if not test['subsuite']] michael@0: michael@0: # mark all tests as passing unless indicated otherwise michael@0: for test in tests: michael@0: test['expected'] = test.get('expected', 'pass') michael@0: michael@0: # ignore tests that do not exist michael@0: if exists: michael@0: tests = [test for test in tests if os.path.exists(test['path'])] michael@0: michael@0: # filter by tags michael@0: self.filter(values, tests) michael@0: michael@0: # ignore disabled tests if specified michael@0: if not disabled: michael@0: tests = [test for test in tests michael@0: if not 'disabled' in test] michael@0: michael@0: # return active tests michael@0: return tests michael@0: michael@0: def test_paths(self): michael@0: return [test['path'] for test in self.active_tests()] michael@0: michael@0: michael@0: ### command line attributes michael@0: michael@0: class ParserError(Exception): michael@0: """error for exceptions while parsing the command line""" michael@0: michael@0: def parse_args(_args): michael@0: """ michael@0: parse and return: michael@0: --keys=value (or --key value) michael@0: -tags michael@0: args michael@0: """ michael@0: michael@0: # return values michael@0: _dict = {} michael@0: tags = [] michael@0: args = [] michael@0: michael@0: # parse the arguments michael@0: key = None michael@0: for arg in _args: michael@0: if arg.startswith('---'): michael@0: raise ParserError("arguments should start with '-' or '--' only") michael@0: elif arg.startswith('--'): michael@0: if key: michael@0: raise ParserError("Key %s still open" % key) michael@0: key = arg[2:] michael@0: if '=' in key: michael@0: key, value = key.split('=', 1) michael@0: _dict[key] = value michael@0: key = None michael@0: continue michael@0: elif arg.startswith('-'): michael@0: if key: michael@0: raise ParserError("Key %s still open" % key) michael@0: tags.append(arg[1:]) michael@0: continue michael@0: else: michael@0: if key: michael@0: _dict[key] = arg michael@0: continue michael@0: args.append(arg) michael@0: michael@0: # return values michael@0: return (_dict, tags, args) michael@0: michael@0: michael@0: ### classes for subcommands michael@0: michael@0: class CLICommand(object): michael@0: usage = '%prog [options] command' michael@0: def __init__(self, parser): michael@0: self._parser = parser # master parser michael@0: def parser(self): michael@0: return OptionParser(usage=self.usage, description=self.__doc__, michael@0: add_help_option=False) michael@0: michael@0: class Copy(CLICommand): michael@0: usage = '%prog [options] copy manifest directory -tag1 -tag2 --key1=value1 --key2=value2 ...' michael@0: def __call__(self, options, args): michael@0: # parse the arguments michael@0: try: michael@0: kwargs, tags, args = parse_args(args) michael@0: except ParserError, e: michael@0: self._parser.error(e.message) michael@0: michael@0: # make sure we have some manifests, otherwise it will michael@0: # be quite boring michael@0: if not len(args) == 2: michael@0: HelpCLI(self._parser)(options, ['copy']) michael@0: return michael@0: michael@0: # read the manifests michael@0: # TODO: should probably ensure these exist here michael@0: manifests = ManifestParser() michael@0: manifests.read(args[0]) michael@0: michael@0: # print the resultant query michael@0: manifests.copy(args[1], None, *tags, **kwargs) michael@0: michael@0: michael@0: class CreateCLI(CLICommand): michael@0: """ michael@0: create a manifest from a list of directories michael@0: """ michael@0: usage = '%prog [options] create directory <...>' michael@0: michael@0: def parser(self): michael@0: parser = CLICommand.parser(self) michael@0: parser.add_option('-p', '--pattern', dest='pattern', michael@0: help="glob pattern for files") michael@0: parser.add_option('-i', '--ignore', dest='ignore', michael@0: default=[], action='append', michael@0: help='directories to ignore') michael@0: parser.add_option('-w', '--in-place', dest='in_place', michael@0: help='Write .ini files in place; filename to write to') michael@0: return parser michael@0: michael@0: def __call__(self, _options, args): michael@0: parser = self.parser() michael@0: options, args = parser.parse_args(args) michael@0: michael@0: # need some directories michael@0: if not len(args): michael@0: parser.print_usage() michael@0: return michael@0: michael@0: # add the directories to the manifest michael@0: for arg in args: michael@0: assert os.path.exists(arg) michael@0: assert os.path.isdir(arg) michael@0: manifest = convert(args, pattern=options.pattern, ignore=options.ignore, michael@0: write=options.in_place) michael@0: if manifest: michael@0: print manifest michael@0: michael@0: michael@0: class WriteCLI(CLICommand): michael@0: """ michael@0: write a manifest based on a query michael@0: """ michael@0: usage = '%prog [options] write manifest -tag1 -tag2 --key1=value1 --key2=value2 ...' michael@0: def __call__(self, options, args): michael@0: michael@0: # parse the arguments michael@0: try: michael@0: kwargs, tags, args = parse_args(args) michael@0: except ParserError, e: michael@0: self._parser.error(e.message) michael@0: michael@0: # make sure we have some manifests, otherwise it will michael@0: # be quite boring michael@0: if not args: michael@0: HelpCLI(self._parser)(options, ['write']) michael@0: return michael@0: michael@0: # read the manifests michael@0: # TODO: should probably ensure these exist here michael@0: manifests = ManifestParser() michael@0: manifests.read(*args) michael@0: michael@0: # print the resultant query michael@0: manifests.write(global_tags=tags, global_kwargs=kwargs) michael@0: michael@0: michael@0: class HelpCLI(CLICommand): michael@0: """ michael@0: get help on a command michael@0: """ michael@0: usage = '%prog [options] help [command]' michael@0: michael@0: def __call__(self, options, args): michael@0: if len(args) == 1 and args[0] in commands: michael@0: commands[args[0]](self._parser).parser().print_help() michael@0: else: michael@0: self._parser.print_help() michael@0: print '\nCommands:' michael@0: for command in sorted(commands): michael@0: print ' %s : %s' % (command, commands[command].__doc__.strip()) michael@0: michael@0: class UpdateCLI(CLICommand): michael@0: """ michael@0: update the tests as listed in a manifest from a directory michael@0: """ michael@0: usage = '%prog [options] update manifest directory -tag1 -tag2 --key1=value1 --key2=value2 ...' michael@0: michael@0: def __call__(self, options, args): michael@0: # parse the arguments michael@0: try: michael@0: kwargs, tags, args = parse_args(args) michael@0: except ParserError, e: michael@0: self._parser.error(e.message) michael@0: michael@0: # make sure we have some manifests, otherwise it will michael@0: # be quite boring michael@0: if not len(args) == 2: michael@0: HelpCLI(self._parser)(options, ['update']) michael@0: return michael@0: michael@0: # read the manifests michael@0: # TODO: should probably ensure these exist here michael@0: manifests = ManifestParser() michael@0: manifests.read(args[0]) michael@0: michael@0: # print the resultant query michael@0: manifests.update(args[1], None, *tags, **kwargs) michael@0: michael@0: michael@0: # command -> class mapping michael@0: commands = { 'create': CreateCLI, michael@0: 'help': HelpCLI, michael@0: 'update': UpdateCLI, michael@0: 'write': WriteCLI } michael@0: michael@0: def main(args=sys.argv[1:]): michael@0: """console_script entry point""" michael@0: michael@0: # set up an option parser michael@0: usage = '%prog [options] [command] ...' michael@0: description = "%s. Use `help` to display commands" % __doc__.strip() michael@0: parser = OptionParser(usage=usage, description=description) michael@0: parser.add_option('-s', '--strict', dest='strict', michael@0: action='store_true', default=False, michael@0: help='adhere strictly to errors') michael@0: parser.disable_interspersed_args() michael@0: michael@0: options, args = parser.parse_args(args) michael@0: michael@0: if not args: michael@0: HelpCLI(parser)(options, args) michael@0: parser.exit() michael@0: michael@0: # get the command michael@0: command = args[0] michael@0: if command not in commands: michael@0: parser.error("Command must be one of %s (you gave '%s')" % (', '.join(sorted(commands.keys())), command)) michael@0: michael@0: handler = commands[command](parser) michael@0: handler(options, args[1:]) michael@0: michael@0: if __name__ == '__main__': michael@0: main()