Fri, 16 Jan 2015 18:13:44 +0100
Integrate suggestion from review to improve consistency with existing code.
1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 from mozpack.copier import (
6 FileCopier,
7 FilePurger,
8 FileRegistry,
9 Jarrer,
10 )
11 from mozpack.files import (
12 GeneratedFile,
13 ExistingFile,
14 )
15 from mozpack.mozjar import JarReader
16 import mozpack.path
17 import unittest
18 import mozunit
19 import os
20 import stat
21 from mozpack.errors import ErrorMessage
22 from mozpack.test.test_files import (
23 MockDest,
24 MatchTestTemplate,
25 TestWithTmpDir,
26 )
29 class TestFileRegistry(MatchTestTemplate, unittest.TestCase):
30 def add(self, path):
31 self.registry.add(path, GeneratedFile(path))
33 def do_check(self, pattern, result):
34 self.checked = True
35 if result:
36 self.assertTrue(self.registry.contains(pattern))
37 else:
38 self.assertFalse(self.registry.contains(pattern))
39 self.assertEqual(self.registry.match(pattern), result)
41 def test_file_registry(self):
42 self.registry = FileRegistry()
43 self.registry.add('foo', GeneratedFile('foo'))
44 bar = GeneratedFile('bar')
45 self.registry.add('bar', bar)
46 self.assertEqual(self.registry.paths(), ['foo', 'bar'])
47 self.assertEqual(self.registry['bar'], bar)
49 self.assertRaises(ErrorMessage, self.registry.add, 'foo',
50 GeneratedFile('foo2'))
52 self.assertRaises(ErrorMessage, self.registry.remove, 'qux')
54 self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
55 GeneratedFile('foobar'))
56 self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar/baz',
57 GeneratedFile('foobar'))
59 self.assertEqual(self.registry.paths(), ['foo', 'bar'])
61 self.registry.remove('foo')
62 self.assertEqual(self.registry.paths(), ['bar'])
63 self.registry.remove('bar')
64 self.assertEqual(self.registry.paths(), [])
66 self.prepare_match_test()
67 self.do_match_test()
68 self.assertTrue(self.checked)
69 self.assertEqual(self.registry.paths(), [
70 'bar',
71 'foo/bar',
72 'foo/baz',
73 'foo/qux/1',
74 'foo/qux/bar',
75 'foo/qux/2/test',
76 'foo/qux/2/test2',
77 ])
79 self.registry.remove('foo/qux')
80 self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz'])
82 self.registry.add('foo/qux', GeneratedFile('fooqux'))
83 self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz',
84 'foo/qux'])
85 self.registry.remove('foo/b*')
86 self.assertEqual(self.registry.paths(), ['bar', 'foo/qux'])
88 self.assertEqual([f for f, c in self.registry], ['bar', 'foo/qux'])
89 self.assertEqual(len(self.registry), 2)
91 self.add('foo/.foo')
92 self.assertTrue(self.registry.contains('foo/.foo'))
94 def test_registry_paths(self):
95 self.registry = FileRegistry()
97 # Can't add a file if it requires a directory in place of a
98 # file we also require.
99 self.registry.add('foo', GeneratedFile('foo'))
100 self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
101 GeneratedFile('foobar'))
103 # Can't add a file if we already have a directory there.
104 self.registry.add('bar/baz', GeneratedFile('barbaz'))
105 self.assertRaises(ErrorMessage, self.registry.add, 'bar',
106 GeneratedFile('bar'))
108 # Bump the count of things that require bar/ to 2.
109 self.registry.add('bar/zot', GeneratedFile('barzot'))
110 self.assertRaises(ErrorMessage, self.registry.add, 'bar',
111 GeneratedFile('bar'))
113 # Drop the count of things that require bar/ to 1.
114 self.registry.remove('bar/baz')
115 self.assertRaises(ErrorMessage, self.registry.add, 'bar',
116 GeneratedFile('bar'))
118 # Drop the count of things that require bar/ to 0.
119 self.registry.remove('bar/zot')
120 self.registry.add('bar/zot', GeneratedFile('barzot'))
122 def test_required_directories(self):
123 self.registry = FileRegistry()
125 self.registry.add('foo', GeneratedFile('foo'))
126 self.assertEqual(self.registry.required_directories(), set())
128 self.registry.add('bar/baz', GeneratedFile('barbaz'))
129 self.assertEqual(self.registry.required_directories(), {'bar'})
131 self.registry.add('bar/zot', GeneratedFile('barzot'))
132 self.assertEqual(self.registry.required_directories(), {'bar'})
134 self.registry.add('bar/zap/zot', GeneratedFile('barzapzot'))
135 self.assertEqual(self.registry.required_directories(), {'bar', 'bar/zap'})
137 self.registry.remove('bar/zap/zot')
138 self.assertEqual(self.registry.required_directories(), {'bar'})
140 self.registry.remove('bar/baz')
141 self.assertEqual(self.registry.required_directories(), {'bar'})
143 self.registry.remove('bar/zot')
144 self.assertEqual(self.registry.required_directories(), set())
146 self.registry.add('x/y/z', GeneratedFile('xyz'))
147 self.assertEqual(self.registry.required_directories(), {'x', 'x/y'})
150 class TestFileCopier(TestWithTmpDir):
151 def all_dirs(self, base):
152 all_dirs = set()
153 for root, dirs, files in os.walk(base):
154 if not dirs:
155 all_dirs.add(mozpack.path.relpath(root, base))
156 return all_dirs
158 def all_files(self, base):
159 all_files = set()
160 for root, dirs, files in os.walk(base):
161 for f in files:
162 all_files.add(
163 mozpack.path.join(mozpack.path.relpath(root, base), f))
164 return all_files
166 def test_file_copier(self):
167 copier = FileCopier()
168 copier.add('foo/bar', GeneratedFile('foobar'))
169 copier.add('foo/qux', GeneratedFile('fooqux'))
170 copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
171 copier.add('bar', GeneratedFile('bar'))
172 copier.add('qux/foo', GeneratedFile('quxfoo'))
173 copier.add('qux/bar', GeneratedFile(''))
175 result = copier.copy(self.tmpdir)
176 self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
177 self.assertEqual(self.all_dirs(self.tmpdir),
178 set(['foo/deep/nested/directory', 'qux']))
180 self.assertEqual(result.updated_files, set(self.tmppath(p) for p in
181 self.all_files(self.tmpdir)))
182 self.assertEqual(result.existing_files, set())
183 self.assertEqual(result.removed_files, set())
184 self.assertEqual(result.removed_directories, set())
186 copier.remove('foo')
187 copier.add('test', GeneratedFile('test'))
188 result = copier.copy(self.tmpdir)
189 self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
190 self.assertEqual(self.all_dirs(self.tmpdir), set(['qux']))
191 self.assertEqual(result.removed_files, set(self.tmppath(p) for p in
192 ('foo/bar', 'foo/qux', 'foo/deep/nested/directory/file')))
194 def test_symlink_directory_replaced(self):
195 """Directory symlinks in destination are replaced if they need to be
196 real directories."""
197 if not self.symlink_supported:
198 return
200 dest = self.tmppath('dest')
202 copier = FileCopier()
203 copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
205 os.makedirs(self.tmppath('dest/foo'))
206 dummy = self.tmppath('dummy')
207 os.mkdir(dummy)
208 link = self.tmppath('dest/foo/bar')
209 os.symlink(dummy, link)
211 result = copier.copy(dest)
213 st = os.lstat(link)
214 self.assertFalse(stat.S_ISLNK(st.st_mode))
215 self.assertTrue(stat.S_ISDIR(st.st_mode))
217 self.assertEqual(self.all_files(dest), set(copier.paths()))
219 self.assertEqual(result.removed_directories, set())
220 self.assertEqual(len(result.updated_files), 1)
222 def test_remove_unaccounted_directory_symlinks(self):
223 """Directory symlinks in destination that are not in the way are
224 deleted according to remove_unaccounted and
225 remove_all_directory_symlinks.
226 """
227 if not self.symlink_supported:
228 return
230 dest = self.tmppath('dest')
232 copier = FileCopier()
233 copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
235 os.makedirs(self.tmppath('dest/foo'))
236 dummy = self.tmppath('dummy')
237 os.mkdir(dummy)
239 os.mkdir(self.tmppath('dest/zot'))
240 link = self.tmppath('dest/zot/zap')
241 os.symlink(dummy, link)
243 # If not remove_unaccounted but remove_empty_directories, then
244 # the symlinked directory remains (as does its containing
245 # directory).
246 result = copier.copy(dest, remove_unaccounted=False,
247 remove_empty_directories=True,
248 remove_all_directory_symlinks=False)
250 st = os.lstat(link)
251 self.assertTrue(stat.S_ISLNK(st.st_mode))
252 self.assertFalse(stat.S_ISDIR(st.st_mode))
254 self.assertEqual(self.all_files(dest), set(copier.paths()))
255 self.assertEqual(self.all_dirs(dest), set(['foo/bar']))
257 self.assertEqual(result.removed_directories, set())
258 self.assertEqual(len(result.updated_files), 1)
260 # If remove_unaccounted but not remove_empty_directories, then
261 # only the symlinked directory is removed.
262 result = copier.copy(dest, remove_unaccounted=True,
263 remove_empty_directories=False,
264 remove_all_directory_symlinks=False)
266 st = os.lstat(self.tmppath('dest/zot'))
267 self.assertFalse(stat.S_ISLNK(st.st_mode))
268 self.assertTrue(stat.S_ISDIR(st.st_mode))
270 self.assertEqual(result.removed_files, set([link]))
271 self.assertEqual(result.removed_directories, set())
273 self.assertEqual(self.all_files(dest), set(copier.paths()))
274 self.assertEqual(self.all_dirs(dest), set(['foo/bar', 'zot']))
276 # If remove_unaccounted and remove_empty_directories, then
277 # both the symlink and its containing directory are removed.
278 link = self.tmppath('dest/zot/zap')
279 os.symlink(dummy, link)
281 result = copier.copy(dest, remove_unaccounted=True,
282 remove_empty_directories=True,
283 remove_all_directory_symlinks=False)
285 self.assertEqual(result.removed_files, set([link]))
286 self.assertEqual(result.removed_directories, set([self.tmppath('dest/zot')]))
288 self.assertEqual(self.all_files(dest), set(copier.paths()))
289 self.assertEqual(self.all_dirs(dest), set(['foo/bar']))
291 def test_permissions(self):
292 """Ensure files without write permission can be deleted."""
293 with open(self.tmppath('dummy'), 'a'):
294 pass
296 p = self.tmppath('no_perms')
297 with open(p, 'a'):
298 pass
300 # Make file and directory unwritable. Reminder: making a directory
301 # unwritable prevents modifications (including deletes) from the list
302 # of files in that directory.
303 os.chmod(p, 0400)
304 os.chmod(self.tmpdir, 0400)
306 copier = FileCopier()
307 copier.add('dummy', GeneratedFile('content'))
308 result = copier.copy(self.tmpdir)
309 self.assertEqual(result.removed_files_count, 1)
310 self.assertFalse(os.path.exists(p))
312 def test_no_remove(self):
313 copier = FileCopier()
314 copier.add('foo', GeneratedFile('foo'))
316 with open(self.tmppath('bar'), 'a'):
317 pass
319 os.mkdir(self.tmppath('emptydir'))
320 d = self.tmppath('populateddir')
321 os.mkdir(d)
323 with open(self.tmppath('populateddir/foo'), 'a'):
324 pass
326 result = copier.copy(self.tmpdir, remove_unaccounted=False)
328 self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
329 'populateddir/foo']))
330 self.assertEqual(self.all_dirs(self.tmpdir), set(['populateddir']))
331 self.assertEqual(result.removed_files, set())
332 self.assertEqual(result.removed_directories,
333 set([self.tmppath('emptydir')]))
335 def test_no_remove_empty_directories(self):
336 copier = FileCopier()
337 copier.add('foo', GeneratedFile('foo'))
339 with open(self.tmppath('bar'), 'a'):
340 pass
342 os.mkdir(self.tmppath('emptydir'))
343 d = self.tmppath('populateddir')
344 os.mkdir(d)
346 with open(self.tmppath('populateddir/foo'), 'a'):
347 pass
349 result = copier.copy(self.tmpdir, remove_unaccounted=False,
350 remove_empty_directories=False)
352 self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
353 'populateddir/foo']))
354 self.assertEqual(self.all_dirs(self.tmpdir), set(['emptydir',
355 'populateddir']))
356 self.assertEqual(result.removed_files, set())
357 self.assertEqual(result.removed_directories, set())
359 def test_optional_exists_creates_unneeded_directory(self):
360 """Demonstrate that a directory not strictly required, but specified
361 as the path to an optional file, will be unnecessarily created.
363 This behaviour is wrong; fixing it is tracked by Bug 972432;
364 and this test exists to guard against unexpected changes in
365 behaviour.
366 """
368 dest = self.tmppath('dest')
370 copier = FileCopier()
371 copier.add('foo/bar', ExistingFile(required=False))
373 result = copier.copy(dest)
375 st = os.lstat(self.tmppath('dest/foo'))
376 self.assertFalse(stat.S_ISLNK(st.st_mode))
377 self.assertTrue(stat.S_ISDIR(st.st_mode))
379 # What's worse, we have no record that dest was created.
380 self.assertEquals(len(result.updated_files), 0)
382 # But we do have an erroneous record of an optional file
383 # existing when it does not.
384 self.assertIn(self.tmppath('dest/foo/bar'), result.existing_files)
387 class TestFilePurger(TestWithTmpDir):
388 def test_file_purger(self):
389 existing = os.path.join(self.tmpdir, 'existing')
390 extra = os.path.join(self.tmpdir, 'extra')
391 empty_dir = os.path.join(self.tmpdir, 'dir')
393 with open(existing, 'a'):
394 pass
396 with open(extra, 'a'):
397 pass
399 os.mkdir(empty_dir)
400 with open(os.path.join(empty_dir, 'foo'), 'a'):
401 pass
403 self.assertTrue(os.path.exists(existing))
404 self.assertTrue(os.path.exists(extra))
406 purger = FilePurger()
407 purger.add('existing')
408 result = purger.purge(self.tmpdir)
409 self.assertEqual(result.removed_files, set(self.tmppath(p) for p in
410 ('extra', 'dir/foo')))
411 self.assertEqual(result.removed_files_count, 2)
412 self.assertEqual(result.removed_directories_count, 1)
414 self.assertTrue(os.path.exists(existing))
415 self.assertFalse(os.path.exists(extra))
416 self.assertFalse(os.path.exists(empty_dir))
419 class TestJarrer(unittest.TestCase):
420 def check_jar(self, dest, copier):
421 jar = JarReader(fileobj=dest)
422 self.assertEqual([f.filename for f in jar], copier.paths())
423 for f in jar:
424 self.assertEqual(f.uncompressed_data.read(),
425 copier[f.filename].content)
427 def test_jarrer(self):
428 copier = Jarrer()
429 copier.add('foo/bar', GeneratedFile('foobar'))
430 copier.add('foo/qux', GeneratedFile('fooqux'))
431 copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
432 copier.add('bar', GeneratedFile('bar'))
433 copier.add('qux/foo', GeneratedFile('quxfoo'))
434 copier.add('qux/bar', GeneratedFile(''))
436 dest = MockDest()
437 copier.copy(dest)
438 self.check_jar(dest, copier)
440 copier.remove('foo')
441 copier.add('test', GeneratedFile('test'))
442 copier.copy(dest)
443 self.check_jar(dest, copier)
445 copier.remove('test')
446 copier.add('test', GeneratedFile('replaced-content'))
447 copier.copy(dest)
448 self.check_jar(dest, copier)
450 copier.copy(dest)
451 self.check_jar(dest, copier)
453 preloaded = ['qux/bar', 'bar']
454 copier.preload(preloaded)
455 copier.copy(dest)
457 dest.seek(0)
458 jar = JarReader(fileobj=dest)
459 self.assertEqual([f.filename for f in jar], preloaded +
460 [p for p in copier.paths() if not p in preloaded])
461 self.assertEqual(jar.last_preloaded, preloaded[-1])
463 if __name__ == '__main__':
464 mozunit.main()