python/mozbuild/mozpack/test/test_copier.py

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 # This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 # License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
michael@0 4
michael@0 5 from mozpack.copier import (
michael@0 6 FileCopier,
michael@0 7 FilePurger,
michael@0 8 FileRegistry,
michael@0 9 Jarrer,
michael@0 10 )
michael@0 11 from mozpack.files import (
michael@0 12 GeneratedFile,
michael@0 13 ExistingFile,
michael@0 14 )
michael@0 15 from mozpack.mozjar import JarReader
michael@0 16 import mozpack.path
michael@0 17 import unittest
michael@0 18 import mozunit
michael@0 19 import os
michael@0 20 import stat
michael@0 21 from mozpack.errors import ErrorMessage
michael@0 22 from mozpack.test.test_files import (
michael@0 23 MockDest,
michael@0 24 MatchTestTemplate,
michael@0 25 TestWithTmpDir,
michael@0 26 )
michael@0 27
michael@0 28
michael@0 29 class TestFileRegistry(MatchTestTemplate, unittest.TestCase):
michael@0 30 def add(self, path):
michael@0 31 self.registry.add(path, GeneratedFile(path))
michael@0 32
michael@0 33 def do_check(self, pattern, result):
michael@0 34 self.checked = True
michael@0 35 if result:
michael@0 36 self.assertTrue(self.registry.contains(pattern))
michael@0 37 else:
michael@0 38 self.assertFalse(self.registry.contains(pattern))
michael@0 39 self.assertEqual(self.registry.match(pattern), result)
michael@0 40
michael@0 41 def test_file_registry(self):
michael@0 42 self.registry = FileRegistry()
michael@0 43 self.registry.add('foo', GeneratedFile('foo'))
michael@0 44 bar = GeneratedFile('bar')
michael@0 45 self.registry.add('bar', bar)
michael@0 46 self.assertEqual(self.registry.paths(), ['foo', 'bar'])
michael@0 47 self.assertEqual(self.registry['bar'], bar)
michael@0 48
michael@0 49 self.assertRaises(ErrorMessage, self.registry.add, 'foo',
michael@0 50 GeneratedFile('foo2'))
michael@0 51
michael@0 52 self.assertRaises(ErrorMessage, self.registry.remove, 'qux')
michael@0 53
michael@0 54 self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
michael@0 55 GeneratedFile('foobar'))
michael@0 56 self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar/baz',
michael@0 57 GeneratedFile('foobar'))
michael@0 58
michael@0 59 self.assertEqual(self.registry.paths(), ['foo', 'bar'])
michael@0 60
michael@0 61 self.registry.remove('foo')
michael@0 62 self.assertEqual(self.registry.paths(), ['bar'])
michael@0 63 self.registry.remove('bar')
michael@0 64 self.assertEqual(self.registry.paths(), [])
michael@0 65
michael@0 66 self.prepare_match_test()
michael@0 67 self.do_match_test()
michael@0 68 self.assertTrue(self.checked)
michael@0 69 self.assertEqual(self.registry.paths(), [
michael@0 70 'bar',
michael@0 71 'foo/bar',
michael@0 72 'foo/baz',
michael@0 73 'foo/qux/1',
michael@0 74 'foo/qux/bar',
michael@0 75 'foo/qux/2/test',
michael@0 76 'foo/qux/2/test2',
michael@0 77 ])
michael@0 78
michael@0 79 self.registry.remove('foo/qux')
michael@0 80 self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz'])
michael@0 81
michael@0 82 self.registry.add('foo/qux', GeneratedFile('fooqux'))
michael@0 83 self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz',
michael@0 84 'foo/qux'])
michael@0 85 self.registry.remove('foo/b*')
michael@0 86 self.assertEqual(self.registry.paths(), ['bar', 'foo/qux'])
michael@0 87
michael@0 88 self.assertEqual([f for f, c in self.registry], ['bar', 'foo/qux'])
michael@0 89 self.assertEqual(len(self.registry), 2)
michael@0 90
michael@0 91 self.add('foo/.foo')
michael@0 92 self.assertTrue(self.registry.contains('foo/.foo'))
michael@0 93
michael@0 94 def test_registry_paths(self):
michael@0 95 self.registry = FileRegistry()
michael@0 96
michael@0 97 # Can't add a file if it requires a directory in place of a
michael@0 98 # file we also require.
michael@0 99 self.registry.add('foo', GeneratedFile('foo'))
michael@0 100 self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
michael@0 101 GeneratedFile('foobar'))
michael@0 102
michael@0 103 # Can't add a file if we already have a directory there.
michael@0 104 self.registry.add('bar/baz', GeneratedFile('barbaz'))
michael@0 105 self.assertRaises(ErrorMessage, self.registry.add, 'bar',
michael@0 106 GeneratedFile('bar'))
michael@0 107
michael@0 108 # Bump the count of things that require bar/ to 2.
michael@0 109 self.registry.add('bar/zot', GeneratedFile('barzot'))
michael@0 110 self.assertRaises(ErrorMessage, self.registry.add, 'bar',
michael@0 111 GeneratedFile('bar'))
michael@0 112
michael@0 113 # Drop the count of things that require bar/ to 1.
michael@0 114 self.registry.remove('bar/baz')
michael@0 115 self.assertRaises(ErrorMessage, self.registry.add, 'bar',
michael@0 116 GeneratedFile('bar'))
michael@0 117
michael@0 118 # Drop the count of things that require bar/ to 0.
michael@0 119 self.registry.remove('bar/zot')
michael@0 120 self.registry.add('bar/zot', GeneratedFile('barzot'))
michael@0 121
michael@0 122 def test_required_directories(self):
michael@0 123 self.registry = FileRegistry()
michael@0 124
michael@0 125 self.registry.add('foo', GeneratedFile('foo'))
michael@0 126 self.assertEqual(self.registry.required_directories(), set())
michael@0 127
michael@0 128 self.registry.add('bar/baz', GeneratedFile('barbaz'))
michael@0 129 self.assertEqual(self.registry.required_directories(), {'bar'})
michael@0 130
michael@0 131 self.registry.add('bar/zot', GeneratedFile('barzot'))
michael@0 132 self.assertEqual(self.registry.required_directories(), {'bar'})
michael@0 133
michael@0 134 self.registry.add('bar/zap/zot', GeneratedFile('barzapzot'))
michael@0 135 self.assertEqual(self.registry.required_directories(), {'bar', 'bar/zap'})
michael@0 136
michael@0 137 self.registry.remove('bar/zap/zot')
michael@0 138 self.assertEqual(self.registry.required_directories(), {'bar'})
michael@0 139
michael@0 140 self.registry.remove('bar/baz')
michael@0 141 self.assertEqual(self.registry.required_directories(), {'bar'})
michael@0 142
michael@0 143 self.registry.remove('bar/zot')
michael@0 144 self.assertEqual(self.registry.required_directories(), set())
michael@0 145
michael@0 146 self.registry.add('x/y/z', GeneratedFile('xyz'))
michael@0 147 self.assertEqual(self.registry.required_directories(), {'x', 'x/y'})
michael@0 148
michael@0 149
michael@0 150 class TestFileCopier(TestWithTmpDir):
michael@0 151 def all_dirs(self, base):
michael@0 152 all_dirs = set()
michael@0 153 for root, dirs, files in os.walk(base):
michael@0 154 if not dirs:
michael@0 155 all_dirs.add(mozpack.path.relpath(root, base))
michael@0 156 return all_dirs
michael@0 157
michael@0 158 def all_files(self, base):
michael@0 159 all_files = set()
michael@0 160 for root, dirs, files in os.walk(base):
michael@0 161 for f in files:
michael@0 162 all_files.add(
michael@0 163 mozpack.path.join(mozpack.path.relpath(root, base), f))
michael@0 164 return all_files
michael@0 165
michael@0 166 def test_file_copier(self):
michael@0 167 copier = FileCopier()
michael@0 168 copier.add('foo/bar', GeneratedFile('foobar'))
michael@0 169 copier.add('foo/qux', GeneratedFile('fooqux'))
michael@0 170 copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
michael@0 171 copier.add('bar', GeneratedFile('bar'))
michael@0 172 copier.add('qux/foo', GeneratedFile('quxfoo'))
michael@0 173 copier.add('qux/bar', GeneratedFile(''))
michael@0 174
michael@0 175 result = copier.copy(self.tmpdir)
michael@0 176 self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
michael@0 177 self.assertEqual(self.all_dirs(self.tmpdir),
michael@0 178 set(['foo/deep/nested/directory', 'qux']))
michael@0 179
michael@0 180 self.assertEqual(result.updated_files, set(self.tmppath(p) for p in
michael@0 181 self.all_files(self.tmpdir)))
michael@0 182 self.assertEqual(result.existing_files, set())
michael@0 183 self.assertEqual(result.removed_files, set())
michael@0 184 self.assertEqual(result.removed_directories, set())
michael@0 185
michael@0 186 copier.remove('foo')
michael@0 187 copier.add('test', GeneratedFile('test'))
michael@0 188 result = copier.copy(self.tmpdir)
michael@0 189 self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
michael@0 190 self.assertEqual(self.all_dirs(self.tmpdir), set(['qux']))
michael@0 191 self.assertEqual(result.removed_files, set(self.tmppath(p) for p in
michael@0 192 ('foo/bar', 'foo/qux', 'foo/deep/nested/directory/file')))
michael@0 193
michael@0 194 def test_symlink_directory_replaced(self):
michael@0 195 """Directory symlinks in destination are replaced if they need to be
michael@0 196 real directories."""
michael@0 197 if not self.symlink_supported:
michael@0 198 return
michael@0 199
michael@0 200 dest = self.tmppath('dest')
michael@0 201
michael@0 202 copier = FileCopier()
michael@0 203 copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
michael@0 204
michael@0 205 os.makedirs(self.tmppath('dest/foo'))
michael@0 206 dummy = self.tmppath('dummy')
michael@0 207 os.mkdir(dummy)
michael@0 208 link = self.tmppath('dest/foo/bar')
michael@0 209 os.symlink(dummy, link)
michael@0 210
michael@0 211 result = copier.copy(dest)
michael@0 212
michael@0 213 st = os.lstat(link)
michael@0 214 self.assertFalse(stat.S_ISLNK(st.st_mode))
michael@0 215 self.assertTrue(stat.S_ISDIR(st.st_mode))
michael@0 216
michael@0 217 self.assertEqual(self.all_files(dest), set(copier.paths()))
michael@0 218
michael@0 219 self.assertEqual(result.removed_directories, set())
michael@0 220 self.assertEqual(len(result.updated_files), 1)
michael@0 221
michael@0 222 def test_remove_unaccounted_directory_symlinks(self):
michael@0 223 """Directory symlinks in destination that are not in the way are
michael@0 224 deleted according to remove_unaccounted and
michael@0 225 remove_all_directory_symlinks.
michael@0 226 """
michael@0 227 if not self.symlink_supported:
michael@0 228 return
michael@0 229
michael@0 230 dest = self.tmppath('dest')
michael@0 231
michael@0 232 copier = FileCopier()
michael@0 233 copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
michael@0 234
michael@0 235 os.makedirs(self.tmppath('dest/foo'))
michael@0 236 dummy = self.tmppath('dummy')
michael@0 237 os.mkdir(dummy)
michael@0 238
michael@0 239 os.mkdir(self.tmppath('dest/zot'))
michael@0 240 link = self.tmppath('dest/zot/zap')
michael@0 241 os.symlink(dummy, link)
michael@0 242
michael@0 243 # If not remove_unaccounted but remove_empty_directories, then
michael@0 244 # the symlinked directory remains (as does its containing
michael@0 245 # directory).
michael@0 246 result = copier.copy(dest, remove_unaccounted=False,
michael@0 247 remove_empty_directories=True,
michael@0 248 remove_all_directory_symlinks=False)
michael@0 249
michael@0 250 st = os.lstat(link)
michael@0 251 self.assertTrue(stat.S_ISLNK(st.st_mode))
michael@0 252 self.assertFalse(stat.S_ISDIR(st.st_mode))
michael@0 253
michael@0 254 self.assertEqual(self.all_files(dest), set(copier.paths()))
michael@0 255 self.assertEqual(self.all_dirs(dest), set(['foo/bar']))
michael@0 256
michael@0 257 self.assertEqual(result.removed_directories, set())
michael@0 258 self.assertEqual(len(result.updated_files), 1)
michael@0 259
michael@0 260 # If remove_unaccounted but not remove_empty_directories, then
michael@0 261 # only the symlinked directory is removed.
michael@0 262 result = copier.copy(dest, remove_unaccounted=True,
michael@0 263 remove_empty_directories=False,
michael@0 264 remove_all_directory_symlinks=False)
michael@0 265
michael@0 266 st = os.lstat(self.tmppath('dest/zot'))
michael@0 267 self.assertFalse(stat.S_ISLNK(st.st_mode))
michael@0 268 self.assertTrue(stat.S_ISDIR(st.st_mode))
michael@0 269
michael@0 270 self.assertEqual(result.removed_files, set([link]))
michael@0 271 self.assertEqual(result.removed_directories, set())
michael@0 272
michael@0 273 self.assertEqual(self.all_files(dest), set(copier.paths()))
michael@0 274 self.assertEqual(self.all_dirs(dest), set(['foo/bar', 'zot']))
michael@0 275
michael@0 276 # If remove_unaccounted and remove_empty_directories, then
michael@0 277 # both the symlink and its containing directory are removed.
michael@0 278 link = self.tmppath('dest/zot/zap')
michael@0 279 os.symlink(dummy, link)
michael@0 280
michael@0 281 result = copier.copy(dest, remove_unaccounted=True,
michael@0 282 remove_empty_directories=True,
michael@0 283 remove_all_directory_symlinks=False)
michael@0 284
michael@0 285 self.assertEqual(result.removed_files, set([link]))
michael@0 286 self.assertEqual(result.removed_directories, set([self.tmppath('dest/zot')]))
michael@0 287
michael@0 288 self.assertEqual(self.all_files(dest), set(copier.paths()))
michael@0 289 self.assertEqual(self.all_dirs(dest), set(['foo/bar']))
michael@0 290
michael@0 291 def test_permissions(self):
michael@0 292 """Ensure files without write permission can be deleted."""
michael@0 293 with open(self.tmppath('dummy'), 'a'):
michael@0 294 pass
michael@0 295
michael@0 296 p = self.tmppath('no_perms')
michael@0 297 with open(p, 'a'):
michael@0 298 pass
michael@0 299
michael@0 300 # Make file and directory unwritable. Reminder: making a directory
michael@0 301 # unwritable prevents modifications (including deletes) from the list
michael@0 302 # of files in that directory.
michael@0 303 os.chmod(p, 0400)
michael@0 304 os.chmod(self.tmpdir, 0400)
michael@0 305
michael@0 306 copier = FileCopier()
michael@0 307 copier.add('dummy', GeneratedFile('content'))
michael@0 308 result = copier.copy(self.tmpdir)
michael@0 309 self.assertEqual(result.removed_files_count, 1)
michael@0 310 self.assertFalse(os.path.exists(p))
michael@0 311
michael@0 312 def test_no_remove(self):
michael@0 313 copier = FileCopier()
michael@0 314 copier.add('foo', GeneratedFile('foo'))
michael@0 315
michael@0 316 with open(self.tmppath('bar'), 'a'):
michael@0 317 pass
michael@0 318
michael@0 319 os.mkdir(self.tmppath('emptydir'))
michael@0 320 d = self.tmppath('populateddir')
michael@0 321 os.mkdir(d)
michael@0 322
michael@0 323 with open(self.tmppath('populateddir/foo'), 'a'):
michael@0 324 pass
michael@0 325
michael@0 326 result = copier.copy(self.tmpdir, remove_unaccounted=False)
michael@0 327
michael@0 328 self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
michael@0 329 'populateddir/foo']))
michael@0 330 self.assertEqual(self.all_dirs(self.tmpdir), set(['populateddir']))
michael@0 331 self.assertEqual(result.removed_files, set())
michael@0 332 self.assertEqual(result.removed_directories,
michael@0 333 set([self.tmppath('emptydir')]))
michael@0 334
michael@0 335 def test_no_remove_empty_directories(self):
michael@0 336 copier = FileCopier()
michael@0 337 copier.add('foo', GeneratedFile('foo'))
michael@0 338
michael@0 339 with open(self.tmppath('bar'), 'a'):
michael@0 340 pass
michael@0 341
michael@0 342 os.mkdir(self.tmppath('emptydir'))
michael@0 343 d = self.tmppath('populateddir')
michael@0 344 os.mkdir(d)
michael@0 345
michael@0 346 with open(self.tmppath('populateddir/foo'), 'a'):
michael@0 347 pass
michael@0 348
michael@0 349 result = copier.copy(self.tmpdir, remove_unaccounted=False,
michael@0 350 remove_empty_directories=False)
michael@0 351
michael@0 352 self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
michael@0 353 'populateddir/foo']))
michael@0 354 self.assertEqual(self.all_dirs(self.tmpdir), set(['emptydir',
michael@0 355 'populateddir']))
michael@0 356 self.assertEqual(result.removed_files, set())
michael@0 357 self.assertEqual(result.removed_directories, set())
michael@0 358
michael@0 359 def test_optional_exists_creates_unneeded_directory(self):
michael@0 360 """Demonstrate that a directory not strictly required, but specified
michael@0 361 as the path to an optional file, will be unnecessarily created.
michael@0 362
michael@0 363 This behaviour is wrong; fixing it is tracked by Bug 972432;
michael@0 364 and this test exists to guard against unexpected changes in
michael@0 365 behaviour.
michael@0 366 """
michael@0 367
michael@0 368 dest = self.tmppath('dest')
michael@0 369
michael@0 370 copier = FileCopier()
michael@0 371 copier.add('foo/bar', ExistingFile(required=False))
michael@0 372
michael@0 373 result = copier.copy(dest)
michael@0 374
michael@0 375 st = os.lstat(self.tmppath('dest/foo'))
michael@0 376 self.assertFalse(stat.S_ISLNK(st.st_mode))
michael@0 377 self.assertTrue(stat.S_ISDIR(st.st_mode))
michael@0 378
michael@0 379 # What's worse, we have no record that dest was created.
michael@0 380 self.assertEquals(len(result.updated_files), 0)
michael@0 381
michael@0 382 # But we do have an erroneous record of an optional file
michael@0 383 # existing when it does not.
michael@0 384 self.assertIn(self.tmppath('dest/foo/bar'), result.existing_files)
michael@0 385
michael@0 386
michael@0 387 class TestFilePurger(TestWithTmpDir):
michael@0 388 def test_file_purger(self):
michael@0 389 existing = os.path.join(self.tmpdir, 'existing')
michael@0 390 extra = os.path.join(self.tmpdir, 'extra')
michael@0 391 empty_dir = os.path.join(self.tmpdir, 'dir')
michael@0 392
michael@0 393 with open(existing, 'a'):
michael@0 394 pass
michael@0 395
michael@0 396 with open(extra, 'a'):
michael@0 397 pass
michael@0 398
michael@0 399 os.mkdir(empty_dir)
michael@0 400 with open(os.path.join(empty_dir, 'foo'), 'a'):
michael@0 401 pass
michael@0 402
michael@0 403 self.assertTrue(os.path.exists(existing))
michael@0 404 self.assertTrue(os.path.exists(extra))
michael@0 405
michael@0 406 purger = FilePurger()
michael@0 407 purger.add('existing')
michael@0 408 result = purger.purge(self.tmpdir)
michael@0 409 self.assertEqual(result.removed_files, set(self.tmppath(p) for p in
michael@0 410 ('extra', 'dir/foo')))
michael@0 411 self.assertEqual(result.removed_files_count, 2)
michael@0 412 self.assertEqual(result.removed_directories_count, 1)
michael@0 413
michael@0 414 self.assertTrue(os.path.exists(existing))
michael@0 415 self.assertFalse(os.path.exists(extra))
michael@0 416 self.assertFalse(os.path.exists(empty_dir))
michael@0 417
michael@0 418
michael@0 419 class TestJarrer(unittest.TestCase):
michael@0 420 def check_jar(self, dest, copier):
michael@0 421 jar = JarReader(fileobj=dest)
michael@0 422 self.assertEqual([f.filename for f in jar], copier.paths())
michael@0 423 for f in jar:
michael@0 424 self.assertEqual(f.uncompressed_data.read(),
michael@0 425 copier[f.filename].content)
michael@0 426
michael@0 427 def test_jarrer(self):
michael@0 428 copier = Jarrer()
michael@0 429 copier.add('foo/bar', GeneratedFile('foobar'))
michael@0 430 copier.add('foo/qux', GeneratedFile('fooqux'))
michael@0 431 copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
michael@0 432 copier.add('bar', GeneratedFile('bar'))
michael@0 433 copier.add('qux/foo', GeneratedFile('quxfoo'))
michael@0 434 copier.add('qux/bar', GeneratedFile(''))
michael@0 435
michael@0 436 dest = MockDest()
michael@0 437 copier.copy(dest)
michael@0 438 self.check_jar(dest, copier)
michael@0 439
michael@0 440 copier.remove('foo')
michael@0 441 copier.add('test', GeneratedFile('test'))
michael@0 442 copier.copy(dest)
michael@0 443 self.check_jar(dest, copier)
michael@0 444
michael@0 445 copier.remove('test')
michael@0 446 copier.add('test', GeneratedFile('replaced-content'))
michael@0 447 copier.copy(dest)
michael@0 448 self.check_jar(dest, copier)
michael@0 449
michael@0 450 copier.copy(dest)
michael@0 451 self.check_jar(dest, copier)
michael@0 452
michael@0 453 preloaded = ['qux/bar', 'bar']
michael@0 454 copier.preload(preloaded)
michael@0 455 copier.copy(dest)
michael@0 456
michael@0 457 dest.seek(0)
michael@0 458 jar = JarReader(fileobj=dest)
michael@0 459 self.assertEqual([f.filename for f in jar], preloaded +
michael@0 460 [p for p in copier.paths() if not p in preloaded])
michael@0 461 self.assertEqual(jar.last_preloaded, preloaded[-1])
michael@0 462
michael@0 463 if __name__ == '__main__':
michael@0 464 mozunit.main()

mercurial