michael@0: # This Source Code Form is subject to the terms of the Mozilla Public michael@0: # License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: # file, You can obtain one at http://mozilla.org/MPL/2.0/. michael@0: michael@0: import unittest michael@0: michael@0: import shutil michael@0: import os michael@0: import re michael@0: import sys michael@0: import random michael@0: import copy michael@0: from string import letters michael@0: michael@0: ''' michael@0: Test case infrastructure for MozZipFile. michael@0: michael@0: This isn't really a unit test, but a test case generator and runner. michael@0: For a given set of files, lengths, and number of writes, we create michael@0: a testcase for every combination of the three. There are some michael@0: symmetries used to reduce the number of test cases, the first file michael@0: written is always the first file, the second is either the first or michael@0: the second, the third is one of the first three. That is, if we michael@0: had 4 files, but only three writes, the fourth file would never even michael@0: get tried. michael@0: michael@0: The content written to the jars is pseudorandom with a fixed seed. michael@0: ''' michael@0: michael@0: if not __file__: michael@0: __file__ = sys.argv[0] michael@0: sys.path.append(os.path.join(os.path.dirname(__file__), '..')) michael@0: michael@0: from MozZipFile import ZipFile michael@0: import zipfile michael@0: michael@0: leafs = ( michael@0: 'firstdir/oneleaf', michael@0: 'seconddir/twoleaf', michael@0: 'thirddir/with/sub/threeleaf') michael@0: _lengths = map(lambda n: n * 64, [16, 64, 80]) michael@0: lengths = 3 michael@0: writes = 5 michael@0: michael@0: def givenlength(i): michael@0: '''Return a length given in the _lengths array to allow manual michael@0: tuning of which lengths of zip entries to use. michael@0: ''' michael@0: return _lengths[i] michael@0: michael@0: michael@0: def prod(*iterables): michael@0: ''''Tensor product of a list of iterables. michael@0: michael@0: This generator returns lists of items, one of each given michael@0: iterable. It iterates over all possible combinations. michael@0: ''' michael@0: for item in iterables[0]: michael@0: if len(iterables) == 1: michael@0: yield [item] michael@0: else: michael@0: for others in prod(*iterables[1:]): michael@0: yield [item] + others michael@0: michael@0: michael@0: def getid(descs): michael@0: 'Convert a list of ints to a string.' michael@0: return reduce(lambda x,y: x+'{0}{1}'.format(*tuple(y)), descs,'') michael@0: michael@0: michael@0: def getContent(length): michael@0: 'Get pseudo random content of given length.' michael@0: rv = [None] * length michael@0: for i in xrange(length): michael@0: rv[i] = random.choice(letters) michael@0: return ''.join(rv) michael@0: michael@0: michael@0: def createWriter(sizer, *items): michael@0: 'Helper method to fill in tests, one set of writes, one for each item' michael@0: locitems = copy.deepcopy(items) michael@0: for item in locitems: michael@0: item['length'] = sizer(item.pop('length', 0)) michael@0: def helper(self): michael@0: mode = 'w' michael@0: if os.path.isfile(self.f): michael@0: mode = 'a' michael@0: zf = ZipFile(self.f, mode, self.compression) michael@0: for item in locitems: michael@0: self._write(zf, **item) michael@0: zf = None michael@0: pass michael@0: return helper michael@0: michael@0: def createTester(name, *writes): michael@0: '''Helper method to fill in tests, calls into a list of write michael@0: helper methods. michael@0: ''' michael@0: _writes = copy.copy(writes) michael@0: def tester(self): michael@0: for w in _writes: michael@0: getattr(self, w)() michael@0: self._verifyZip() michael@0: pass michael@0: # unit tests get confused if the method name isn't test... michael@0: tester.__name__ = name michael@0: return tester michael@0: michael@0: class TestExtensiveStored(unittest.TestCase): michael@0: '''Unit tests for MozZipFile michael@0: michael@0: The testcase are actually populated by code following the class michael@0: definition. michael@0: ''' michael@0: michael@0: stage = "mozzipfilestage" michael@0: compression = zipfile.ZIP_STORED michael@0: michael@0: def leaf(self, *leafs): michael@0: return os.path.join(self.stage, *leafs) michael@0: def setUp(self): michael@0: if os.path.exists(self.stage): michael@0: shutil.rmtree(self.stage) michael@0: os.mkdir(self.stage) michael@0: self.f = self.leaf('test.jar') michael@0: self.ref = {} michael@0: self.seed = 0 michael@0: michael@0: def tearDown(self): michael@0: self.f = None michael@0: self.ref = None michael@0: michael@0: def _verifyZip(self): michael@0: zf = zipfile.ZipFile(self.f) michael@0: badEntry = zf.testzip() michael@0: self.failIf(badEntry, badEntry) michael@0: zlist = zf.namelist() michael@0: zlist.sort() michael@0: vlist = self.ref.keys() michael@0: vlist.sort() michael@0: self.assertEqual(zlist, vlist) michael@0: for leaf, content in self.ref.iteritems(): michael@0: zcontent = zf.read(leaf) michael@0: self.assertEqual(content, zcontent) michael@0: michael@0: def _write(self, zf, seed=None, leaf=0, length=0): michael@0: if seed is None: michael@0: seed = self.seed michael@0: self.seed += 1 michael@0: random.seed(seed) michael@0: leaf = leafs[leaf] michael@0: content = getContent(length) michael@0: self.ref[leaf] = content michael@0: zf.writestr(leaf, content) michael@0: dir = os.path.dirname(self.leaf('stage', leaf)) michael@0: if not os.path.isdir(dir): michael@0: os.makedirs(dir) michael@0: open(self.leaf('stage', leaf), 'w').write(content) michael@0: michael@0: # all leafs in all lengths michael@0: atomics = list(prod(xrange(len(leafs)), xrange(lengths))) michael@0: michael@0: # populate TestExtensiveStore with testcases michael@0: for w in xrange(writes): michael@0: # Don't iterate over all files for the the first n passes, michael@0: # those are redundant as long as w < lengths. michael@0: # There are symmetries in the trailing end, too, but I don't know michael@0: # how to reduce those out right now. michael@0: nonatomics = [list(prod(range(min(i,len(leafs))), xrange(lengths))) michael@0: for i in xrange(1, w+1)] + [atomics] michael@0: for descs in prod(*nonatomics): michael@0: suffix = getid(descs) michael@0: dicts = [dict(leaf=leaf, length=length) for leaf, length in descs] michael@0: setattr(TestExtensiveStored, '_write' + suffix, michael@0: createWriter(givenlength, *dicts)) michael@0: setattr(TestExtensiveStored, 'test' + suffix, michael@0: createTester('test' + suffix, '_write' + suffix)) michael@0: michael@0: # now create another round of tests, with two writing passes michael@0: # first, write all file combinations into the jar, close it, michael@0: # and then write all atomics again. michael@0: # This should catch more or less all artifacts generated michael@0: # by the final ordering step when closing the jar. michael@0: files = [list(prod([i], xrange(lengths))) for i in xrange(len(leafs))] michael@0: allfiles = reduce(lambda l,r:l+r, michael@0: [list(prod(*files[:(i+1)])) for i in xrange(len(leafs))]) michael@0: michael@0: for first in allfiles: michael@0: testbasename = 'test{0}_'.format(getid(first)) michael@0: test = [None, '_write' + getid(first), None] michael@0: for second in atomics: michael@0: test[0] = testbasename + getid([second]) michael@0: test[2] = '_write' + getid([second]) michael@0: setattr(TestExtensiveStored, test[0], createTester(*test)) michael@0: michael@0: class TestExtensiveDeflated(TestExtensiveStored): michael@0: 'Test all that has been tested with ZIP_STORED with DEFLATED, too.' michael@0: compression = zipfile.ZIP_DEFLATED michael@0: michael@0: if __name__ == '__main__': michael@0: unittest.main()