michael@0: # This Source Code Form is subject to the terms of the Mozilla Public michael@0: # License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: # file, You can obtain one at http://mozilla.org/MPL/2.0/. michael@0: michael@0: from mozpack.copier import ( michael@0: FileCopier, michael@0: FilePurger, michael@0: FileRegistry, michael@0: Jarrer, michael@0: ) michael@0: from mozpack.files import ( michael@0: GeneratedFile, michael@0: ExistingFile, michael@0: ) michael@0: from mozpack.mozjar import JarReader michael@0: import mozpack.path michael@0: import unittest michael@0: import mozunit michael@0: import os michael@0: import stat michael@0: from mozpack.errors import ErrorMessage michael@0: from mozpack.test.test_files import ( michael@0: MockDest, michael@0: MatchTestTemplate, michael@0: TestWithTmpDir, michael@0: ) michael@0: michael@0: michael@0: class TestFileRegistry(MatchTestTemplate, unittest.TestCase): michael@0: def add(self, path): michael@0: self.registry.add(path, GeneratedFile(path)) michael@0: michael@0: def do_check(self, pattern, result): michael@0: self.checked = True michael@0: if result: michael@0: self.assertTrue(self.registry.contains(pattern)) michael@0: else: michael@0: self.assertFalse(self.registry.contains(pattern)) michael@0: self.assertEqual(self.registry.match(pattern), result) michael@0: michael@0: def test_file_registry(self): michael@0: self.registry = FileRegistry() michael@0: self.registry.add('foo', GeneratedFile('foo')) michael@0: bar = GeneratedFile('bar') michael@0: self.registry.add('bar', bar) michael@0: self.assertEqual(self.registry.paths(), ['foo', 'bar']) michael@0: self.assertEqual(self.registry['bar'], bar) michael@0: michael@0: self.assertRaises(ErrorMessage, self.registry.add, 'foo', michael@0: GeneratedFile('foo2')) michael@0: michael@0: self.assertRaises(ErrorMessage, self.registry.remove, 'qux') michael@0: michael@0: self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar', michael@0: GeneratedFile('foobar')) michael@0: self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar/baz', michael@0: GeneratedFile('foobar')) michael@0: michael@0: self.assertEqual(self.registry.paths(), ['foo', 'bar']) michael@0: michael@0: self.registry.remove('foo') michael@0: self.assertEqual(self.registry.paths(), ['bar']) michael@0: self.registry.remove('bar') michael@0: self.assertEqual(self.registry.paths(), []) michael@0: michael@0: self.prepare_match_test() michael@0: self.do_match_test() michael@0: self.assertTrue(self.checked) michael@0: self.assertEqual(self.registry.paths(), [ michael@0: 'bar', michael@0: 'foo/bar', michael@0: 'foo/baz', michael@0: 'foo/qux/1', michael@0: 'foo/qux/bar', michael@0: 'foo/qux/2/test', michael@0: 'foo/qux/2/test2', michael@0: ]) michael@0: michael@0: self.registry.remove('foo/qux') michael@0: self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz']) michael@0: michael@0: self.registry.add('foo/qux', GeneratedFile('fooqux')) michael@0: self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz', michael@0: 'foo/qux']) michael@0: self.registry.remove('foo/b*') michael@0: self.assertEqual(self.registry.paths(), ['bar', 'foo/qux']) michael@0: michael@0: self.assertEqual([f for f, c in self.registry], ['bar', 'foo/qux']) michael@0: self.assertEqual(len(self.registry), 2) michael@0: michael@0: self.add('foo/.foo') michael@0: self.assertTrue(self.registry.contains('foo/.foo')) michael@0: michael@0: def test_registry_paths(self): michael@0: self.registry = FileRegistry() michael@0: michael@0: # Can't add a file if it requires a directory in place of a michael@0: # file we also require. michael@0: self.registry.add('foo', GeneratedFile('foo')) michael@0: self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar', michael@0: GeneratedFile('foobar')) michael@0: michael@0: # Can't add a file if we already have a directory there. michael@0: self.registry.add('bar/baz', GeneratedFile('barbaz')) michael@0: self.assertRaises(ErrorMessage, self.registry.add, 'bar', michael@0: GeneratedFile('bar')) michael@0: michael@0: # Bump the count of things that require bar/ to 2. michael@0: self.registry.add('bar/zot', GeneratedFile('barzot')) michael@0: self.assertRaises(ErrorMessage, self.registry.add, 'bar', michael@0: GeneratedFile('bar')) michael@0: michael@0: # Drop the count of things that require bar/ to 1. michael@0: self.registry.remove('bar/baz') michael@0: self.assertRaises(ErrorMessage, self.registry.add, 'bar', michael@0: GeneratedFile('bar')) michael@0: michael@0: # Drop the count of things that require bar/ to 0. michael@0: self.registry.remove('bar/zot') michael@0: self.registry.add('bar/zot', GeneratedFile('barzot')) michael@0: michael@0: def test_required_directories(self): michael@0: self.registry = FileRegistry() michael@0: michael@0: self.registry.add('foo', GeneratedFile('foo')) michael@0: self.assertEqual(self.registry.required_directories(), set()) michael@0: michael@0: self.registry.add('bar/baz', GeneratedFile('barbaz')) michael@0: self.assertEqual(self.registry.required_directories(), {'bar'}) michael@0: michael@0: self.registry.add('bar/zot', GeneratedFile('barzot')) michael@0: self.assertEqual(self.registry.required_directories(), {'bar'}) michael@0: michael@0: self.registry.add('bar/zap/zot', GeneratedFile('barzapzot')) michael@0: self.assertEqual(self.registry.required_directories(), {'bar', 'bar/zap'}) michael@0: michael@0: self.registry.remove('bar/zap/zot') michael@0: self.assertEqual(self.registry.required_directories(), {'bar'}) michael@0: michael@0: self.registry.remove('bar/baz') michael@0: self.assertEqual(self.registry.required_directories(), {'bar'}) michael@0: michael@0: self.registry.remove('bar/zot') michael@0: self.assertEqual(self.registry.required_directories(), set()) michael@0: michael@0: self.registry.add('x/y/z', GeneratedFile('xyz')) michael@0: self.assertEqual(self.registry.required_directories(), {'x', 'x/y'}) michael@0: michael@0: michael@0: class TestFileCopier(TestWithTmpDir): michael@0: def all_dirs(self, base): michael@0: all_dirs = set() michael@0: for root, dirs, files in os.walk(base): michael@0: if not dirs: michael@0: all_dirs.add(mozpack.path.relpath(root, base)) michael@0: return all_dirs michael@0: michael@0: def all_files(self, base): michael@0: all_files = set() michael@0: for root, dirs, files in os.walk(base): michael@0: for f in files: michael@0: all_files.add( michael@0: mozpack.path.join(mozpack.path.relpath(root, base), f)) michael@0: return all_files michael@0: michael@0: def test_file_copier(self): michael@0: copier = FileCopier() michael@0: copier.add('foo/bar', GeneratedFile('foobar')) michael@0: copier.add('foo/qux', GeneratedFile('fooqux')) michael@0: copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz')) michael@0: copier.add('bar', GeneratedFile('bar')) michael@0: copier.add('qux/foo', GeneratedFile('quxfoo')) michael@0: copier.add('qux/bar', GeneratedFile('')) michael@0: michael@0: result = copier.copy(self.tmpdir) michael@0: self.assertEqual(self.all_files(self.tmpdir), set(copier.paths())) michael@0: self.assertEqual(self.all_dirs(self.tmpdir), michael@0: set(['foo/deep/nested/directory', 'qux'])) michael@0: michael@0: self.assertEqual(result.updated_files, set(self.tmppath(p) for p in michael@0: self.all_files(self.tmpdir))) michael@0: self.assertEqual(result.existing_files, set()) michael@0: self.assertEqual(result.removed_files, set()) michael@0: self.assertEqual(result.removed_directories, set()) michael@0: michael@0: copier.remove('foo') michael@0: copier.add('test', GeneratedFile('test')) michael@0: result = copier.copy(self.tmpdir) michael@0: self.assertEqual(self.all_files(self.tmpdir), set(copier.paths())) michael@0: self.assertEqual(self.all_dirs(self.tmpdir), set(['qux'])) michael@0: self.assertEqual(result.removed_files, set(self.tmppath(p) for p in michael@0: ('foo/bar', 'foo/qux', 'foo/deep/nested/directory/file'))) michael@0: michael@0: def test_symlink_directory_replaced(self): michael@0: """Directory symlinks in destination are replaced if they need to be michael@0: real directories.""" michael@0: if not self.symlink_supported: michael@0: return michael@0: michael@0: dest = self.tmppath('dest') michael@0: michael@0: copier = FileCopier() michael@0: copier.add('foo/bar/baz', GeneratedFile('foobarbaz')) michael@0: michael@0: os.makedirs(self.tmppath('dest/foo')) michael@0: dummy = self.tmppath('dummy') michael@0: os.mkdir(dummy) michael@0: link = self.tmppath('dest/foo/bar') michael@0: os.symlink(dummy, link) michael@0: michael@0: result = copier.copy(dest) michael@0: michael@0: st = os.lstat(link) michael@0: self.assertFalse(stat.S_ISLNK(st.st_mode)) michael@0: self.assertTrue(stat.S_ISDIR(st.st_mode)) michael@0: michael@0: self.assertEqual(self.all_files(dest), set(copier.paths())) michael@0: michael@0: self.assertEqual(result.removed_directories, set()) michael@0: self.assertEqual(len(result.updated_files), 1) michael@0: michael@0: def test_remove_unaccounted_directory_symlinks(self): michael@0: """Directory symlinks in destination that are not in the way are michael@0: deleted according to remove_unaccounted and michael@0: remove_all_directory_symlinks. michael@0: """ michael@0: if not self.symlink_supported: michael@0: return michael@0: michael@0: dest = self.tmppath('dest') michael@0: michael@0: copier = FileCopier() michael@0: copier.add('foo/bar/baz', GeneratedFile('foobarbaz')) michael@0: michael@0: os.makedirs(self.tmppath('dest/foo')) michael@0: dummy = self.tmppath('dummy') michael@0: os.mkdir(dummy) michael@0: michael@0: os.mkdir(self.tmppath('dest/zot')) michael@0: link = self.tmppath('dest/zot/zap') michael@0: os.symlink(dummy, link) michael@0: michael@0: # If not remove_unaccounted but remove_empty_directories, then michael@0: # the symlinked directory remains (as does its containing michael@0: # directory). michael@0: result = copier.copy(dest, remove_unaccounted=False, michael@0: remove_empty_directories=True, michael@0: remove_all_directory_symlinks=False) michael@0: michael@0: st = os.lstat(link) michael@0: self.assertTrue(stat.S_ISLNK(st.st_mode)) michael@0: self.assertFalse(stat.S_ISDIR(st.st_mode)) michael@0: michael@0: self.assertEqual(self.all_files(dest), set(copier.paths())) michael@0: self.assertEqual(self.all_dirs(dest), set(['foo/bar'])) michael@0: michael@0: self.assertEqual(result.removed_directories, set()) michael@0: self.assertEqual(len(result.updated_files), 1) michael@0: michael@0: # If remove_unaccounted but not remove_empty_directories, then michael@0: # only the symlinked directory is removed. michael@0: result = copier.copy(dest, remove_unaccounted=True, michael@0: remove_empty_directories=False, michael@0: remove_all_directory_symlinks=False) michael@0: michael@0: st = os.lstat(self.tmppath('dest/zot')) michael@0: self.assertFalse(stat.S_ISLNK(st.st_mode)) michael@0: self.assertTrue(stat.S_ISDIR(st.st_mode)) michael@0: michael@0: self.assertEqual(result.removed_files, set([link])) michael@0: self.assertEqual(result.removed_directories, set()) michael@0: michael@0: self.assertEqual(self.all_files(dest), set(copier.paths())) michael@0: self.assertEqual(self.all_dirs(dest), set(['foo/bar', 'zot'])) michael@0: michael@0: # If remove_unaccounted and remove_empty_directories, then michael@0: # both the symlink and its containing directory are removed. michael@0: link = self.tmppath('dest/zot/zap') michael@0: os.symlink(dummy, link) michael@0: michael@0: result = copier.copy(dest, remove_unaccounted=True, michael@0: remove_empty_directories=True, michael@0: remove_all_directory_symlinks=False) michael@0: michael@0: self.assertEqual(result.removed_files, set([link])) michael@0: self.assertEqual(result.removed_directories, set([self.tmppath('dest/zot')])) michael@0: michael@0: self.assertEqual(self.all_files(dest), set(copier.paths())) michael@0: self.assertEqual(self.all_dirs(dest), set(['foo/bar'])) michael@0: michael@0: def test_permissions(self): michael@0: """Ensure files without write permission can be deleted.""" michael@0: with open(self.tmppath('dummy'), 'a'): michael@0: pass michael@0: michael@0: p = self.tmppath('no_perms') michael@0: with open(p, 'a'): michael@0: pass michael@0: michael@0: # Make file and directory unwritable. Reminder: making a directory michael@0: # unwritable prevents modifications (including deletes) from the list michael@0: # of files in that directory. michael@0: os.chmod(p, 0400) michael@0: os.chmod(self.tmpdir, 0400) michael@0: michael@0: copier = FileCopier() michael@0: copier.add('dummy', GeneratedFile('content')) michael@0: result = copier.copy(self.tmpdir) michael@0: self.assertEqual(result.removed_files_count, 1) michael@0: self.assertFalse(os.path.exists(p)) michael@0: michael@0: def test_no_remove(self): michael@0: copier = FileCopier() michael@0: copier.add('foo', GeneratedFile('foo')) michael@0: michael@0: with open(self.tmppath('bar'), 'a'): michael@0: pass michael@0: michael@0: os.mkdir(self.tmppath('emptydir')) michael@0: d = self.tmppath('populateddir') michael@0: os.mkdir(d) michael@0: michael@0: with open(self.tmppath('populateddir/foo'), 'a'): michael@0: pass michael@0: michael@0: result = copier.copy(self.tmpdir, remove_unaccounted=False) michael@0: michael@0: self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar', michael@0: 'populateddir/foo'])) michael@0: self.assertEqual(self.all_dirs(self.tmpdir), set(['populateddir'])) michael@0: self.assertEqual(result.removed_files, set()) michael@0: self.assertEqual(result.removed_directories, michael@0: set([self.tmppath('emptydir')])) michael@0: michael@0: def test_no_remove_empty_directories(self): michael@0: copier = FileCopier() michael@0: copier.add('foo', GeneratedFile('foo')) michael@0: michael@0: with open(self.tmppath('bar'), 'a'): michael@0: pass michael@0: michael@0: os.mkdir(self.tmppath('emptydir')) michael@0: d = self.tmppath('populateddir') michael@0: os.mkdir(d) michael@0: michael@0: with open(self.tmppath('populateddir/foo'), 'a'): michael@0: pass michael@0: michael@0: result = copier.copy(self.tmpdir, remove_unaccounted=False, michael@0: remove_empty_directories=False) michael@0: michael@0: self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar', michael@0: 'populateddir/foo'])) michael@0: self.assertEqual(self.all_dirs(self.tmpdir), set(['emptydir', michael@0: 'populateddir'])) michael@0: self.assertEqual(result.removed_files, set()) michael@0: self.assertEqual(result.removed_directories, set()) michael@0: michael@0: def test_optional_exists_creates_unneeded_directory(self): michael@0: """Demonstrate that a directory not strictly required, but specified michael@0: as the path to an optional file, will be unnecessarily created. michael@0: michael@0: This behaviour is wrong; fixing it is tracked by Bug 972432; michael@0: and this test exists to guard against unexpected changes in michael@0: behaviour. michael@0: """ michael@0: michael@0: dest = self.tmppath('dest') michael@0: michael@0: copier = FileCopier() michael@0: copier.add('foo/bar', ExistingFile(required=False)) michael@0: michael@0: result = copier.copy(dest) michael@0: michael@0: st = os.lstat(self.tmppath('dest/foo')) michael@0: self.assertFalse(stat.S_ISLNK(st.st_mode)) michael@0: self.assertTrue(stat.S_ISDIR(st.st_mode)) michael@0: michael@0: # What's worse, we have no record that dest was created. michael@0: self.assertEquals(len(result.updated_files), 0) michael@0: michael@0: # But we do have an erroneous record of an optional file michael@0: # existing when it does not. michael@0: self.assertIn(self.tmppath('dest/foo/bar'), result.existing_files) michael@0: michael@0: michael@0: class TestFilePurger(TestWithTmpDir): michael@0: def test_file_purger(self): michael@0: existing = os.path.join(self.tmpdir, 'existing') michael@0: extra = os.path.join(self.tmpdir, 'extra') michael@0: empty_dir = os.path.join(self.tmpdir, 'dir') michael@0: michael@0: with open(existing, 'a'): michael@0: pass michael@0: michael@0: with open(extra, 'a'): michael@0: pass michael@0: michael@0: os.mkdir(empty_dir) michael@0: with open(os.path.join(empty_dir, 'foo'), 'a'): michael@0: pass michael@0: michael@0: self.assertTrue(os.path.exists(existing)) michael@0: self.assertTrue(os.path.exists(extra)) michael@0: michael@0: purger = FilePurger() michael@0: purger.add('existing') michael@0: result = purger.purge(self.tmpdir) michael@0: self.assertEqual(result.removed_files, set(self.tmppath(p) for p in michael@0: ('extra', 'dir/foo'))) michael@0: self.assertEqual(result.removed_files_count, 2) michael@0: self.assertEqual(result.removed_directories_count, 1) michael@0: michael@0: self.assertTrue(os.path.exists(existing)) michael@0: self.assertFalse(os.path.exists(extra)) michael@0: self.assertFalse(os.path.exists(empty_dir)) michael@0: michael@0: michael@0: class TestJarrer(unittest.TestCase): michael@0: def check_jar(self, dest, copier): michael@0: jar = JarReader(fileobj=dest) michael@0: self.assertEqual([f.filename for f in jar], copier.paths()) michael@0: for f in jar: michael@0: self.assertEqual(f.uncompressed_data.read(), michael@0: copier[f.filename].content) michael@0: michael@0: def test_jarrer(self): michael@0: copier = Jarrer() michael@0: copier.add('foo/bar', GeneratedFile('foobar')) michael@0: copier.add('foo/qux', GeneratedFile('fooqux')) michael@0: copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz')) michael@0: copier.add('bar', GeneratedFile('bar')) michael@0: copier.add('qux/foo', GeneratedFile('quxfoo')) michael@0: copier.add('qux/bar', GeneratedFile('')) michael@0: michael@0: dest = MockDest() michael@0: copier.copy(dest) michael@0: self.check_jar(dest, copier) michael@0: michael@0: copier.remove('foo') michael@0: copier.add('test', GeneratedFile('test')) michael@0: copier.copy(dest) michael@0: self.check_jar(dest, copier) michael@0: michael@0: copier.remove('test') michael@0: copier.add('test', GeneratedFile('replaced-content')) michael@0: copier.copy(dest) michael@0: self.check_jar(dest, copier) michael@0: michael@0: copier.copy(dest) michael@0: self.check_jar(dest, copier) michael@0: michael@0: preloaded = ['qux/bar', 'bar'] michael@0: copier.preload(preloaded) michael@0: copier.copy(dest) michael@0: michael@0: dest.seek(0) michael@0: jar = JarReader(fileobj=dest) michael@0: self.assertEqual([f.filename for f in jar], preloaded + michael@0: [p for p in copier.paths() if not p in preloaded]) michael@0: self.assertEqual(jar.last_preloaded, preloaded[-1]) michael@0: michael@0: if __name__ == '__main__': michael@0: mozunit.main()