python/mozbuild/mozpack/test/test_copier.py

Fri, 16 Jan 2015 18:13:44 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Fri, 16 Jan 2015 18:13:44 +0100
branch
TOR_BUG_9701
changeset 14
925c144e1f1f
permissions
-rw-r--r--

Integrate suggestion from review to improve consistency with existing code.

     1 # This Source Code Form is subject to the terms of the Mozilla Public
     2 # License, v. 2.0. If a copy of the MPL was not distributed with this
     3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
     5 from mozpack.copier import (
     6     FileCopier,
     7     FilePurger,
     8     FileRegistry,
     9     Jarrer,
    10 )
    11 from mozpack.files import (
    12     GeneratedFile,
    13     ExistingFile,
    14 )
    15 from mozpack.mozjar import JarReader
    16 import mozpack.path
    17 import unittest
    18 import mozunit
    19 import os
    20 import stat
    21 from mozpack.errors import ErrorMessage
    22 from mozpack.test.test_files import (
    23     MockDest,
    24     MatchTestTemplate,
    25     TestWithTmpDir,
    26 )
    29 class TestFileRegistry(MatchTestTemplate, unittest.TestCase):
    30     def add(self, path):
    31         self.registry.add(path, GeneratedFile(path))
    33     def do_check(self, pattern, result):
    34         self.checked = True
    35         if result:
    36             self.assertTrue(self.registry.contains(pattern))
    37         else:
    38             self.assertFalse(self.registry.contains(pattern))
    39         self.assertEqual(self.registry.match(pattern), result)
    41     def test_file_registry(self):
    42         self.registry = FileRegistry()
    43         self.registry.add('foo', GeneratedFile('foo'))
    44         bar = GeneratedFile('bar')
    45         self.registry.add('bar', bar)
    46         self.assertEqual(self.registry.paths(), ['foo', 'bar'])
    47         self.assertEqual(self.registry['bar'], bar)
    49         self.assertRaises(ErrorMessage, self.registry.add, 'foo',
    50                           GeneratedFile('foo2'))
    52         self.assertRaises(ErrorMessage, self.registry.remove, 'qux')
    54         self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
    55                           GeneratedFile('foobar'))
    56         self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar/baz',
    57                           GeneratedFile('foobar'))
    59         self.assertEqual(self.registry.paths(), ['foo', 'bar'])
    61         self.registry.remove('foo')
    62         self.assertEqual(self.registry.paths(), ['bar'])
    63         self.registry.remove('bar')
    64         self.assertEqual(self.registry.paths(), [])
    66         self.prepare_match_test()
    67         self.do_match_test()
    68         self.assertTrue(self.checked)
    69         self.assertEqual(self.registry.paths(), [
    70             'bar',
    71             'foo/bar',
    72             'foo/baz',
    73             'foo/qux/1',
    74             'foo/qux/bar',
    75             'foo/qux/2/test',
    76             'foo/qux/2/test2',
    77         ])
    79         self.registry.remove('foo/qux')
    80         self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz'])
    82         self.registry.add('foo/qux', GeneratedFile('fooqux'))
    83         self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz',
    84                                                  'foo/qux'])
    85         self.registry.remove('foo/b*')
    86         self.assertEqual(self.registry.paths(), ['bar', 'foo/qux'])
    88         self.assertEqual([f for f, c in self.registry], ['bar', 'foo/qux'])
    89         self.assertEqual(len(self.registry), 2)
    91         self.add('foo/.foo')
    92         self.assertTrue(self.registry.contains('foo/.foo'))
    94     def test_registry_paths(self):
    95         self.registry = FileRegistry()
    97         # Can't add a file if it requires a directory in place of a
    98         # file we also require.
    99         self.registry.add('foo', GeneratedFile('foo'))
   100         self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
   101                           GeneratedFile('foobar'))
   103         # Can't add a file if we already have a directory there.
   104         self.registry.add('bar/baz', GeneratedFile('barbaz'))
   105         self.assertRaises(ErrorMessage, self.registry.add, 'bar',
   106                           GeneratedFile('bar'))
   108         # Bump the count of things that require bar/ to 2.
   109         self.registry.add('bar/zot', GeneratedFile('barzot'))
   110         self.assertRaises(ErrorMessage, self.registry.add, 'bar',
   111                           GeneratedFile('bar'))
   113         # Drop the count of things that require bar/ to 1.
   114         self.registry.remove('bar/baz')
   115         self.assertRaises(ErrorMessage, self.registry.add, 'bar',
   116                           GeneratedFile('bar'))
   118         # Drop the count of things that require bar/ to 0.
   119         self.registry.remove('bar/zot')
   120         self.registry.add('bar/zot', GeneratedFile('barzot'))
   122     def test_required_directories(self):
   123         self.registry = FileRegistry()
   125         self.registry.add('foo', GeneratedFile('foo'))
   126         self.assertEqual(self.registry.required_directories(), set())
   128         self.registry.add('bar/baz', GeneratedFile('barbaz'))
   129         self.assertEqual(self.registry.required_directories(), {'bar'})
   131         self.registry.add('bar/zot', GeneratedFile('barzot'))
   132         self.assertEqual(self.registry.required_directories(), {'bar'})
   134         self.registry.add('bar/zap/zot', GeneratedFile('barzapzot'))
   135         self.assertEqual(self.registry.required_directories(), {'bar', 'bar/zap'})
   137         self.registry.remove('bar/zap/zot')
   138         self.assertEqual(self.registry.required_directories(), {'bar'})
   140         self.registry.remove('bar/baz')
   141         self.assertEqual(self.registry.required_directories(), {'bar'})
   143         self.registry.remove('bar/zot')
   144         self.assertEqual(self.registry.required_directories(), set())
   146         self.registry.add('x/y/z', GeneratedFile('xyz'))
   147         self.assertEqual(self.registry.required_directories(), {'x', 'x/y'})
   150 class TestFileCopier(TestWithTmpDir):
   151     def all_dirs(self, base):
   152         all_dirs = set()
   153         for root, dirs, files in os.walk(base):
   154             if not dirs:
   155                 all_dirs.add(mozpack.path.relpath(root, base))
   156         return all_dirs
   158     def all_files(self, base):
   159         all_files = set()
   160         for root, dirs, files in os.walk(base):
   161             for f in files:
   162                 all_files.add(
   163                     mozpack.path.join(mozpack.path.relpath(root, base), f))
   164         return all_files
   166     def test_file_copier(self):
   167         copier = FileCopier()
   168         copier.add('foo/bar', GeneratedFile('foobar'))
   169         copier.add('foo/qux', GeneratedFile('fooqux'))
   170         copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
   171         copier.add('bar', GeneratedFile('bar'))
   172         copier.add('qux/foo', GeneratedFile('quxfoo'))
   173         copier.add('qux/bar', GeneratedFile(''))
   175         result = copier.copy(self.tmpdir)
   176         self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
   177         self.assertEqual(self.all_dirs(self.tmpdir),
   178                          set(['foo/deep/nested/directory', 'qux']))
   180         self.assertEqual(result.updated_files, set(self.tmppath(p) for p in
   181             self.all_files(self.tmpdir)))
   182         self.assertEqual(result.existing_files, set())
   183         self.assertEqual(result.removed_files, set())
   184         self.assertEqual(result.removed_directories, set())
   186         copier.remove('foo')
   187         copier.add('test', GeneratedFile('test'))
   188         result = copier.copy(self.tmpdir)
   189         self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
   190         self.assertEqual(self.all_dirs(self.tmpdir), set(['qux']))
   191         self.assertEqual(result.removed_files, set(self.tmppath(p) for p in
   192             ('foo/bar', 'foo/qux', 'foo/deep/nested/directory/file')))
   194     def test_symlink_directory_replaced(self):
   195         """Directory symlinks in destination are replaced if they need to be
   196         real directories."""
   197         if not self.symlink_supported:
   198             return
   200         dest = self.tmppath('dest')
   202         copier = FileCopier()
   203         copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
   205         os.makedirs(self.tmppath('dest/foo'))
   206         dummy = self.tmppath('dummy')
   207         os.mkdir(dummy)
   208         link = self.tmppath('dest/foo/bar')
   209         os.symlink(dummy, link)
   211         result = copier.copy(dest)
   213         st = os.lstat(link)
   214         self.assertFalse(stat.S_ISLNK(st.st_mode))
   215         self.assertTrue(stat.S_ISDIR(st.st_mode))
   217         self.assertEqual(self.all_files(dest), set(copier.paths()))
   219         self.assertEqual(result.removed_directories, set())
   220         self.assertEqual(len(result.updated_files), 1)
   222     def test_remove_unaccounted_directory_symlinks(self):
   223         """Directory symlinks in destination that are not in the way are
   224         deleted according to remove_unaccounted and
   225         remove_all_directory_symlinks.
   226         """
   227         if not self.symlink_supported:
   228             return
   230         dest = self.tmppath('dest')
   232         copier = FileCopier()
   233         copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
   235         os.makedirs(self.tmppath('dest/foo'))
   236         dummy = self.tmppath('dummy')
   237         os.mkdir(dummy)
   239         os.mkdir(self.tmppath('dest/zot'))
   240         link = self.tmppath('dest/zot/zap')
   241         os.symlink(dummy, link)
   243         # If not remove_unaccounted but remove_empty_directories, then
   244         # the symlinked directory remains (as does its containing
   245         # directory).
   246         result = copier.copy(dest, remove_unaccounted=False,
   247             remove_empty_directories=True,
   248             remove_all_directory_symlinks=False)
   250         st = os.lstat(link)
   251         self.assertTrue(stat.S_ISLNK(st.st_mode))
   252         self.assertFalse(stat.S_ISDIR(st.st_mode))
   254         self.assertEqual(self.all_files(dest), set(copier.paths()))
   255         self.assertEqual(self.all_dirs(dest), set(['foo/bar']))
   257         self.assertEqual(result.removed_directories, set())
   258         self.assertEqual(len(result.updated_files), 1)
   260         # If remove_unaccounted but not remove_empty_directories, then
   261         # only the symlinked directory is removed.
   262         result = copier.copy(dest, remove_unaccounted=True,
   263             remove_empty_directories=False,
   264             remove_all_directory_symlinks=False)
   266         st = os.lstat(self.tmppath('dest/zot'))
   267         self.assertFalse(stat.S_ISLNK(st.st_mode))
   268         self.assertTrue(stat.S_ISDIR(st.st_mode))
   270         self.assertEqual(result.removed_files, set([link]))
   271         self.assertEqual(result.removed_directories, set())
   273         self.assertEqual(self.all_files(dest), set(copier.paths()))
   274         self.assertEqual(self.all_dirs(dest), set(['foo/bar', 'zot']))
   276         # If remove_unaccounted and remove_empty_directories, then
   277         # both the symlink and its containing directory are removed.
   278         link = self.tmppath('dest/zot/zap')
   279         os.symlink(dummy, link)
   281         result = copier.copy(dest, remove_unaccounted=True,
   282             remove_empty_directories=True,
   283             remove_all_directory_symlinks=False)
   285         self.assertEqual(result.removed_files, set([link]))
   286         self.assertEqual(result.removed_directories, set([self.tmppath('dest/zot')]))
   288         self.assertEqual(self.all_files(dest), set(copier.paths()))
   289         self.assertEqual(self.all_dirs(dest), set(['foo/bar']))
   291     def test_permissions(self):
   292         """Ensure files without write permission can be deleted."""
   293         with open(self.tmppath('dummy'), 'a'):
   294             pass
   296         p = self.tmppath('no_perms')
   297         with open(p, 'a'):
   298             pass
   300         # Make file and directory unwritable. Reminder: making a directory
   301         # unwritable prevents modifications (including deletes) from the list
   302         # of files in that directory.
   303         os.chmod(p, 0400)
   304         os.chmod(self.tmpdir, 0400)
   306         copier = FileCopier()
   307         copier.add('dummy', GeneratedFile('content'))
   308         result = copier.copy(self.tmpdir)
   309         self.assertEqual(result.removed_files_count, 1)
   310         self.assertFalse(os.path.exists(p))
   312     def test_no_remove(self):
   313         copier = FileCopier()
   314         copier.add('foo', GeneratedFile('foo'))
   316         with open(self.tmppath('bar'), 'a'):
   317             pass
   319         os.mkdir(self.tmppath('emptydir'))
   320         d = self.tmppath('populateddir')
   321         os.mkdir(d)
   323         with open(self.tmppath('populateddir/foo'), 'a'):
   324             pass
   326         result = copier.copy(self.tmpdir, remove_unaccounted=False)
   328         self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
   329             'populateddir/foo']))
   330         self.assertEqual(self.all_dirs(self.tmpdir), set(['populateddir']))
   331         self.assertEqual(result.removed_files, set())
   332         self.assertEqual(result.removed_directories,
   333             set([self.tmppath('emptydir')]))
   335     def test_no_remove_empty_directories(self):
   336         copier = FileCopier()
   337         copier.add('foo', GeneratedFile('foo'))
   339         with open(self.tmppath('bar'), 'a'):
   340             pass
   342         os.mkdir(self.tmppath('emptydir'))
   343         d = self.tmppath('populateddir')
   344         os.mkdir(d)
   346         with open(self.tmppath('populateddir/foo'), 'a'):
   347             pass
   349         result = copier.copy(self.tmpdir, remove_unaccounted=False,
   350             remove_empty_directories=False)
   352         self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
   353             'populateddir/foo']))
   354         self.assertEqual(self.all_dirs(self.tmpdir), set(['emptydir',
   355             'populateddir']))
   356         self.assertEqual(result.removed_files, set())
   357         self.assertEqual(result.removed_directories, set())
   359     def test_optional_exists_creates_unneeded_directory(self):
   360         """Demonstrate that a directory not strictly required, but specified
   361         as the path to an optional file, will be unnecessarily created.
   363         This behaviour is wrong; fixing it is tracked by Bug 972432;
   364         and this test exists to guard against unexpected changes in
   365         behaviour.
   366         """
   368         dest = self.tmppath('dest')
   370         copier = FileCopier()
   371         copier.add('foo/bar', ExistingFile(required=False))
   373         result = copier.copy(dest)
   375         st = os.lstat(self.tmppath('dest/foo'))
   376         self.assertFalse(stat.S_ISLNK(st.st_mode))
   377         self.assertTrue(stat.S_ISDIR(st.st_mode))
   379         # What's worse, we have no record that dest was created.
   380         self.assertEquals(len(result.updated_files), 0)
   382         # But we do have an erroneous record of an optional file
   383         # existing when it does not.
   384         self.assertIn(self.tmppath('dest/foo/bar'), result.existing_files)
   387 class TestFilePurger(TestWithTmpDir):
   388     def test_file_purger(self):
   389         existing = os.path.join(self.tmpdir, 'existing')
   390         extra = os.path.join(self.tmpdir, 'extra')
   391         empty_dir = os.path.join(self.tmpdir, 'dir')
   393         with open(existing, 'a'):
   394             pass
   396         with open(extra, 'a'):
   397             pass
   399         os.mkdir(empty_dir)
   400         with open(os.path.join(empty_dir, 'foo'), 'a'):
   401             pass
   403         self.assertTrue(os.path.exists(existing))
   404         self.assertTrue(os.path.exists(extra))
   406         purger = FilePurger()
   407         purger.add('existing')
   408         result = purger.purge(self.tmpdir)
   409         self.assertEqual(result.removed_files, set(self.tmppath(p) for p in
   410             ('extra', 'dir/foo')))
   411         self.assertEqual(result.removed_files_count, 2)
   412         self.assertEqual(result.removed_directories_count, 1)
   414         self.assertTrue(os.path.exists(existing))
   415         self.assertFalse(os.path.exists(extra))
   416         self.assertFalse(os.path.exists(empty_dir))
   419 class TestJarrer(unittest.TestCase):
   420     def check_jar(self, dest, copier):
   421         jar = JarReader(fileobj=dest)
   422         self.assertEqual([f.filename for f in jar], copier.paths())
   423         for f in jar:
   424             self.assertEqual(f.uncompressed_data.read(),
   425                              copier[f.filename].content)
   427     def test_jarrer(self):
   428         copier = Jarrer()
   429         copier.add('foo/bar', GeneratedFile('foobar'))
   430         copier.add('foo/qux', GeneratedFile('fooqux'))
   431         copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
   432         copier.add('bar', GeneratedFile('bar'))
   433         copier.add('qux/foo', GeneratedFile('quxfoo'))
   434         copier.add('qux/bar', GeneratedFile(''))
   436         dest = MockDest()
   437         copier.copy(dest)
   438         self.check_jar(dest, copier)
   440         copier.remove('foo')
   441         copier.add('test', GeneratedFile('test'))
   442         copier.copy(dest)
   443         self.check_jar(dest, copier)
   445         copier.remove('test')
   446         copier.add('test', GeneratedFile('replaced-content'))
   447         copier.copy(dest)
   448         self.check_jar(dest, copier)
   450         copier.copy(dest)
   451         self.check_jar(dest, copier)
   453         preloaded = ['qux/bar', 'bar']
   454         copier.preload(preloaded)
   455         copier.copy(dest)
   457         dest.seek(0)
   458         jar = JarReader(fileobj=dest)
   459         self.assertEqual([f.filename for f in jar], preloaded +
   460                          [p for p in copier.paths() if not p in preloaded])
   461         self.assertEqual(jar.last_preloaded, preloaded[-1])
   463 if __name__ == '__main__':
   464     mozunit.main()

mercurial