michael@0: #!/usr/bin/env python michael@0: michael@0: import os michael@0: import stat michael@0: import shutil michael@0: import tempfile michael@0: import threading michael@0: import time michael@0: import unittest michael@0: michael@0: import mozfile michael@0: import mozinfo michael@0: michael@0: import stubs michael@0: michael@0: michael@0: def mark_readonly(path): michael@0: """Removes all write permissions from given file/directory. michael@0: michael@0: :param path: path of directory/file of which modes must be changed michael@0: """ michael@0: mode = os.stat(path)[stat.ST_MODE] michael@0: os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) michael@0: michael@0: michael@0: class FileOpenCloseThread(threading.Thread): michael@0: """Helper thread for asynchronous file handling""" michael@0: def __init__(self, path, delay, delete=False): michael@0: threading.Thread.__init__(self) michael@0: self.delay = delay michael@0: self.path = path michael@0: self.delete = delete michael@0: michael@0: def run(self): michael@0: with open(self.path) as f: michael@0: time.sleep(self.delay) michael@0: if self.delete: michael@0: try: michael@0: os.remove(self.path) michael@0: except: michael@0: pass michael@0: michael@0: michael@0: class MozfileRemoveTestCase(unittest.TestCase): michael@0: """Test our ability to remove directories and files""" michael@0: michael@0: def setUp(self): michael@0: # Generate a stub michael@0: self.tempdir = stubs.create_stub() michael@0: michael@0: def tearDown(self): michael@0: if os.path.isdir(self.tempdir): michael@0: shutil.rmtree(self.tempdir) michael@0: michael@0: def test_remove_directory(self): michael@0: """Test the removal of a directory""" michael@0: self.assertTrue(os.path.isdir(self.tempdir)) michael@0: mozfile.remove(self.tempdir) michael@0: self.assertFalse(os.path.exists(self.tempdir)) michael@0: michael@0: def test_remove_directory_with_open_file(self): michael@0: """Test removing a directory with an open file""" michael@0: # Open a file in the generated stub michael@0: filepath = os.path.join(self.tempdir, *stubs.files[1]) michael@0: f = file(filepath, 'w') michael@0: f.write('foo-bar') michael@0: michael@0: # keep file open and then try removing the dir-tree michael@0: if mozinfo.isWin: michael@0: # On the Windows family WindowsError should be raised. michael@0: self.assertRaises(OSError, mozfile.remove, self.tempdir) michael@0: self.assertTrue(os.path.exists(self.tempdir)) michael@0: else: michael@0: # Folder should be deleted on all other platforms michael@0: mozfile.remove(self.tempdir) michael@0: self.assertFalse(os.path.exists(self.tempdir)) michael@0: michael@0: def test_remove_closed_file(self): michael@0: """Test removing a closed file""" michael@0: # Open a file in the generated stub michael@0: filepath = os.path.join(self.tempdir, *stubs.files[1]) michael@0: with open(filepath, 'w') as f: michael@0: f.write('foo-bar') michael@0: michael@0: # Folder should be deleted on all platforms michael@0: mozfile.remove(self.tempdir) michael@0: self.assertFalse(os.path.exists(self.tempdir)) michael@0: michael@0: def test_removing_open_file_with_retry(self): michael@0: """Test removing a file in use with retry""" michael@0: filepath = os.path.join(self.tempdir, *stubs.files[1]) michael@0: michael@0: thread = FileOpenCloseThread(filepath, 1) michael@0: thread.start() michael@0: michael@0: # Wait a bit so we can be sure the file has been opened michael@0: time.sleep(.5) michael@0: mozfile.remove(filepath) michael@0: thread.join() michael@0: michael@0: # Check deletion was successful michael@0: self.assertFalse(os.path.exists(filepath)) michael@0: michael@0: def test_removing_already_deleted_file_with_retry(self): michael@0: """Test removing a meanwhile removed file with retry""" michael@0: filepath = os.path.join(self.tempdir, *stubs.files[1]) michael@0: michael@0: thread = FileOpenCloseThread(filepath, .8, True) michael@0: thread.start() michael@0: michael@0: # Wait a bit so we can be sure the file has been opened and gets deleted michael@0: # while remove() waits for the next retry michael@0: time.sleep(.5) michael@0: mozfile.remove(filepath) michael@0: thread.join() michael@0: michael@0: # Check deletion was successful michael@0: self.assertFalse(os.path.exists(filepath)) michael@0: michael@0: def test_remove_readonly_tree(self): michael@0: """Test removing a read-only directory""" michael@0: michael@0: dirpath = os.path.join(self.tempdir, "nested_tree") michael@0: mark_readonly(dirpath) michael@0: michael@0: # However, mozfile should change write permissions and remove dir. michael@0: mozfile.remove(dirpath) michael@0: michael@0: self.assertFalse(os.path.exists(dirpath)) michael@0: michael@0: def test_remove_readonly_file(self): michael@0: """Test removing read-only files""" michael@0: filepath = os.path.join(self.tempdir, *stubs.files[1]) michael@0: mark_readonly(filepath) michael@0: michael@0: # However, mozfile should change write permission and then remove file. michael@0: mozfile.remove(filepath) michael@0: michael@0: self.assertFalse(os.path.exists(filepath)) michael@0: michael@0: @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") michael@0: def test_remove_symlink(self): michael@0: """Test removing a symlink""" michael@0: file_path = os.path.join(self.tempdir, *stubs.files[1]) michael@0: symlink_path = os.path.join(self.tempdir, 'symlink') michael@0: michael@0: os.symlink(file_path, symlink_path) michael@0: self.assertTrue(os.path.islink(symlink_path)) michael@0: michael@0: # The linked folder and files should not be deleted michael@0: mozfile.remove(symlink_path) michael@0: self.assertFalse(os.path.exists(symlink_path)) michael@0: self.assertTrue(os.path.exists(file_path)) michael@0: michael@0: @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") michael@0: def test_remove_symlink_in_subfolder(self): michael@0: """Test removing a folder with an contained symlink""" michael@0: file_path = os.path.join(self.tempdir, *stubs.files[0]) michael@0: dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1])) michael@0: symlink_path = os.path.join(dir_path, 'symlink') michael@0: michael@0: os.symlink(file_path, symlink_path) michael@0: self.assertTrue(os.path.islink(symlink_path)) michael@0: michael@0: # The folder with the contained symlink will be deleted but not the michael@0: # original linked file michael@0: mozfile.remove(dir_path) michael@0: self.assertFalse(os.path.exists(dir_path)) michael@0: self.assertFalse(os.path.exists(symlink_path)) michael@0: self.assertTrue(os.path.exists(file_path)) michael@0: michael@0: @unittest.skipIf(mozinfo.isWin or not os.geteuid(), michael@0: "Symlinks are not supported on Windows and cannot run test as root") michael@0: def test_remove_symlink_for_system_path(self): michael@0: """Test removing a symlink which points to a system folder""" michael@0: symlink_path = os.path.join(self.tempdir, 'symlink') michael@0: michael@0: os.symlink(os.path.dirname(self.tempdir), symlink_path) michael@0: self.assertTrue(os.path.islink(symlink_path)) michael@0: michael@0: # The folder with the contained symlink will be deleted but not the michael@0: # original linked file michael@0: mozfile.remove(symlink_path) michael@0: self.assertFalse(os.path.exists(symlink_path))