Python shutil 模块,copymode() 实例源码
我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用shutil.copymode()。
def write_file(self, new_text, filename, old_text, encoding):
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error, err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error, err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
def copymode(src, dst):
"""
Copy the permission bits from src to dst. The file contents, owner, and
group are unaffected. src and dst are path names given as strings.
:Arguments:
src - mode of the file to be copied
dst - file on which mode has to be copied
:Return:
True/False - based on the success/failure of the operation
"""
status = False
try:
shutil.copymode(src, dst)
print_info("mode of src {} copied to dst {} successfully".
format(src, dst))
status = True
except Exception as e:
print_error("copying file mode from {} to file {} raised exception {}".
format(src, dst, str(e)))
return status
def _CopyFile(self, source, target, copyDependentFiles,
includeMode = False):
normalizedSource = os.path.normcase(os.path.normpath(source))
normalizedTarget = os.path.normcase(os.path.normpath(target))
if normalizedTarget in self.filesCopied:
return
if normalizedSource == normalizedTarget:
return
self._RemoveFile(target)
targetDir = os.path.dirname(target)
self._CreateDirectory(targetDir)
if not self.silent:
sys.stdout.write("copying %s -> %s\n" % (source, target))
shutil.copyfile(source, target)
shutil.copystat(source, target)
if includeMode:
shutil.copymode(source, target)
self.filesCopied[normalizedTarget] = None
if copyDependentFiles \
and source not in self.finder.excludeDependentFiles:
for source in self._GetDependentFiles(source):
target = os.path.join(targetDir, os.path.basename(source))
self._CopyFile(source, target, copyDependentFiles)
def add_file(self, path):
source = Path(path)
destination = os.path.join(self.cdrom_dir, source.name)
shutil.copyfile(str(source), destination)
shutil.copymode(str(source), destination)
def __copy(self, src, dst):
"""Internal copy procedure"""
dstDir = os.path.dirname(dst)
if not self.__isAllowed(src, 'read'):
self.__errorData(src, 'Access denied')
return False
if not self.__isAllowed(dstDir, 'write'):
self.__errorData(dstDir, 'Access denied')
return False
if os.path.exists(dst):
self.__errorData(dst, 'File or folder with the same name already exists')
return False
if not os.path.isdir(src):
try:
shutil.copyfile(src, dst)
shutil.copymode(src, dst)
return True
except:
self.__errorData(src, 'Unable to copy files')
return False
else:
try:
os.mkdir(dst)
shutil.copymode(src, dst)
except:
self.__errorData(src, 'Unable to copy files')
return False
for i in os.listdir(src):
newSrc = os.path.join(src, i)
newDst = os.path.join(dst, i)
if not self.__copy(newSrc, newDst):
self.__errorData(newSrc, 'Unable to copy files')
return False
return True
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error as err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error as err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error, err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error, err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error, err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error, err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error, err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error, err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def test_copymode_follow_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
# file to file
os.chmod(dst, stat.S_IRWXO)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
shutil.copymode(src, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow src link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow dst link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src, dst_link)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow both links
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
os.chmod(dst, stat.S_IRWXU)
os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
# link to link
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst_link, follow_symlinks=False)
self.assertEqual(os.lstat(src_link).st_mode,
os.lstat(dst_link).st_mode)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# src link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# dst link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src, dst_link, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink_wo_lchmod(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error as err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error as err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error, err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error, err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def test_copymode_follow_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
# file to file
os.chmod(dst, stat.S_IRWXO)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
shutil.copymode(src, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# On Windows, os.chmod does not follow symlinks (issue #15411)
if os.name != 'nt':
# follow src link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow dst link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src, dst_link)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow both links
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst_link)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
os.chmod(dst, stat.S_IRWXU)
os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
# link to link
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst_link, follow_symlinks=False)
self.assertEqual(os.lstat(src_link).st_mode,
os.lstat(dst_link).st_mode)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# src link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# dst link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src, dst_link, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink_wo_lchmod(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
def test_module_all_attribute(self):
self.assertTrue(hasattr(shutil, '__all__'))
target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
'SpecialFileError', 'ExecError', 'make_archive',
'get_archive_formats', 'register_archive_format',
'unregister_archive_format', 'get_unpack_formats',
'register_unpack_format', 'unregister_unpack_format',
'unpack_archive', 'ignore_patterns', 'chown', 'which',
'get_terminal_size', 'SameFileError']
if hasattr(os, 'statvfs') or os.name == 'nt':
target_api.append('disk_usage')
self.assertEqual(set(shutil.__all__), set(target_api))
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except OSError as err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except OSError as err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error, err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error, err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def updateFile(install_path, diff_path, diff_list, conf_file_list, user, group, pkg_type):
uid = pwd.getpwnam(user).pw_uid
gid = grp.getgrnam(group).gr_gid
for diff in diff_list:
file = diff['file'].lstrip('/')
if pkg_type == 'conf' and file == "package.conf.yaml":
continue
src = os.path.join(diff_path, file)
dst = os.path.join(install_path, file)
if pkg_type != 'conf' and os.path.realpath(dst) in conf_file_list:
continue
if diff['op'] == "M":
shutil.copy2(src, dst)
os.chown(dst, uid, gid)
elif diff['op'] == "A":
public_file_list.append(file)
if not os.path.isdir(os.path.dirname(dst)):
mkdir(os.path.dirname(dst))
os.chown(os.path.dirname(dst), uid, gid)
if os.path.isdir(src):
mkdir(dst)
os.chown(dst, uid, gid)
else:
shutil.copy2(src, dst)
os.chown(dst, uid, gid)
elif diff['op'] == "X":
shutil.copymode(src, dst)
elif diff['op'] == "D":
if file in public_file_list:
continue
if os.path.isfile(dst):
os.remove(dst)
elif os.path.isdir(dst):
if os.path.islink(dst):
os.remove(dst)
else:
shutil.rmtree(dst)
return 0, 'ok'
def sed_i(files, expr, replace_exp, only_first_occurrence=False):
"""
Massively search/replace matching lines in files.
Similar to:
sed -i "s/expr/replace_expr/g" files...
:type files: enumerate or list
:param files: file names generator
:type expr: str or pattern
:type replace_exp: str
:param only_first_occurrence: replace only first occurrence per line
"""
r = _compiled_re(expr)
for f in files:
with open(f, 'r') as source:
tmp_f = f + '.pygrep.tmp'
with open(tmp_f, 'w') as dest:
sed(source, r, replace_exp, dest, only_first_occurrence)
shutil.copymode(f, tmp_f)
ori_f = f + '.pygrep.ori'
os.rename(f, ori_f)
os.rename(tmp_f, f)
os.remove(ori_f)
def test_copymode_follow_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
# file to file
os.chmod(dst, stat.S_IRWXO)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
shutil.copymode(src, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# On Windows, os.chmod does not follow symlinks (issue #15411)
if os.name != 'nt':
# follow src link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow dst link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src, dst_link)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow both links
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst_link)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
os.chmod(dst, stat.S_IRWXU)
os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
# link to link
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst_link, follow_symlinks=False)
self.assertEqual(os.lstat(src_link).st_mode,
os.lstat(dst_link).st_mode)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# src link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# dst link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src, dst_link, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink_wo_lchmod(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
def write_file(self, new_text, filename, old_text, encoding):
orig_filename = filename
if self._output_dir:
if filename.startswith(self._input_base_dir):
filename = os.path.join(self._output_dir,
filename[len(self._input_base_dir):])
else:
raise ValueError('filename %s does not start with the '
'input_base_dir %s' % (
filename, self._input_base_dir))
if self._append_suffix:
filename += self._append_suffix
if orig_filename != filename:
output_dir = os.path.dirname(filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.log_message('Writing converted %s to %s.', orig_filename,
filename)
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except OSError as err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except OSError as err:
self.log_message("Can't rename %s to %s", filename, backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, filename, old_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
if orig_filename != filename:
# Preserve the file mode in the new output directory.
shutil.copymode(orig_filename, filename)
def copy(self, target, mode=False, stat=False):
""" copy path to target.
If mode is True, will copy copy permission from path to target.
If stat is True, copy permission, last modification
time, last access time, and flags from path to target.
"""
if self.check(file=1):
if target.check(dir=1):
target = target.join(self.basename)
assert self!=target
copychunked(self, target)
if mode:
copymode(self.strpath, target.strpath)
if stat:
copystat(self, target)
else:
def rec(p):
return p.check(link=0)
for x in self.visit(rec=rec):
relpath = x.relto(self)
newx = target.join(relpath)
newx.dirpath().ensure(dir=1)
if x.check(link=1):
newx.mksymlinkto(x.readlink())
continue
elif x.check(file=1):
copychunked(x, newx)
elif x.check(dir=1):
newx.ensure(dir=1)
if mode:
copymode(x.strpath, newx.strpath)
if stat:
copystat(x, newx)
def copymode(src, dest):
""" copy permission from src to dst. """
import shutil
shutil.copymode(src, dest)
def install(self, tgtdir, tmplenv):
"""install the file into the target directory."""
srcfile = os.path.abspath(self.srcfile)
tgtfile = os.path.abspath(os.path.join(tgtdir, self.dstfile))
logging.debug('installing file %s to %s mode %s from module %s',
srcfile, tgtfile, self.installMode, self.module)
if not os.path.isfile(srcfile):
logging.warning('file %s is missing or not regular file, provided by %s from %s',
self.srcfile, self.module, self.module.modulefile)
if not os.path.exists(os.path.dirname(tgtfile)):
os.makedirs(os.path.dirname(tgtfile))
# TODO ideally we should never overwrite files, reconfig run should delete previously installed files
if os.path.exists(tgtfile) or os.path.islink(tgtfile):
os.unlink(tgtfile)
if self.installMode=='link':
os.symlink(os.path.relpath(srcfile, os.path.dirname(tgtfile)), tgtfile)
elif self.installMode=='hardlink':
os.link(srcfile, tgtfile)
elif self.installMode=='cinclude':
with open(tgtfile, 'w') as f:
f.write('#include "'+os.path.relpath(srcfile, tgtfile)+'"\n')
elif self.installMode=='mako':
with open(tgtfile, 'w') as f:
tmpl = mako.template.Template(filename=self.srcfile,
imports=['import os'])
ctx = mako.runtime.Context(f, **tmplenv)
tmpl.render_context(ctx)
shutil.copymode(srcfile, tgtfile)
else: # copy the file
shutil.copy2(srcfile, tgtfile)
def write_hunks(self, srcname, tgtname, hunks):
src = open(srcname, "rb")
tgt = open(tgtname, "wb")
debug("processing target file %s" % tgtname)
tgt.writelines(self.patch_stream(src, hunks))
tgt.close()
src.close()
# [ ] TODO: add test for permission copy
shutil.copymode(srcname, tgtname)
return True
def copydir(args, sourcedir, names):
targetdir, replacements, uid, gid = args
# Don't recurse into CVS directories:
for name in names[:]:
if os.path.normcase(name) in CVS_DIRS:
names.remove(name)
elif os.path.isfile(os.path.join(sourcedir, name)):
# Copy the file:
sn, ext = os.path.splitext(name)
if os.path.normcase(ext) == ".in":
dst = os.path.join(targetdir, sourcedir, sn)
if os.path.exists(dst):
continue
copyin(os.path.join(sourcedir, name), dst, replacements, uid,
gid)
if uid is not None:
os.chown(dst, uid, gid)
else:
src = os.path.join(sourcedir, name)
dst = os.path.join(targetdir, src)
if os.path.exists(dst):
continue
shutil.copyfile(src, dst)
shutil.copymode(src, dst)
if uid is not None:
os.chown(dst, uid, gid)
else:
# Directory:
dn = os.path.join(targetdir, sourcedir, name)
if not os.path.exists(dn):
os.mkdir(dn)
shutil.copymode(os.path.join(sourcedir, name), dn)
if uid is not None:
os.chown(dn, uid, gid)
def copyin(src, dst, replacements, uid, gid):
ifp = open(src)
text = ifp.read()
ifp.close()
for k in replacements:
text = text.replace("<<%s>>" % k, replacements[k])
ofp = open(dst, "w")
ofp.write(text)
ofp.close()
shutil.copymode(src, dst)
if uid is not None:
os.chown(dst, uid, gid)
def _copy_file(path_old, path_new, name):
# HACK: use .template suffix to prevent .py file byte compiling
if path_new.endswith(".template"):
path_new = path_new[:-9]
fp_old = open(path_old, 'r')
fp_new = open(path_new, 'w')
# following django template sysntax
fp_new.write(fp_old.read()\
.replace('{{ project_name }}', name)\
.replace('{{ project_name|upper }}', name.upper())\
.replace('{{ project_name|camel }}', _camelize(name))\
.replace('{{ project_name|camel_cmd }}', _camelize(name, fill_char="_"))
)
fp_old.close()
fp_new.close()
# copy permissions
try:
shutil.copymode(path_old, path_new)
except OSError:
sys.stderr.write("Notice: Couldn't set permission bits on %s. You're probably using an uncommon filesystem setup. No problem.\n" % path_new)
# based on django.core.management.base.copy_helper()
def _copyFile(self, context, src, dest):
src = normLongPath(src)
dest = normLongPath(dest)
with open(src, 'rb') as inp:
with openForWrite(dest, 'wb') as out:
shutil.copyfileobj(inp, out)
shutil.copymode(src, dest)
assert os.path.exists(dest)
def _copyFile(self, context, src, dest):
dest = normLongPath(dest)
src = normLongPath(src)
with open(src, 'rb') as s:
with openForWrite(dest, 'wb') as d:
for m in self.mappers:
x = m.getHeader(context)
if x:
self.__unusedMappers.discard(m)
d.write(x)
for l in s:
for m in self.mappers:
prev = l
l = m.mapLine(context, l)
if prev != l:
self.__unusedMappers.discard(m)
if None == l:
break
if None != l:
d.write(l)
for m in self.mappers:
x = m.getFooter(context)
if x:
self.__unusedMappers.discard(m)
d.write(x)
shutil.copymode(src, dest)
assert os.path.exists(dest)
def retimestamp_and_index_file(inpath, outpath=None, retimestamp=None):
# no retimestamping needed
if retimestamp is None:
return index_file(inpath, outpath)
# retimestamp the input in place and index
elif retimestamp == 'inplace':
from flvlib.scripts.retimestamp_flv import retimestamp_file_inplace
log.debug("Retimestamping file `%s' in place", inpath)
# retimestamp the file inplace
if not retimestamp_file_inplace(inpath):
log.error("Failed to retimestamp `%s' in place", inpath)
return False
return index_file(inpath, outpath)
# retimestamp the input into a temporary file
elif retimestamp == 'atomic':
from flvlib.scripts.retimestamp_flv import retimestamp_file_atomically
log.debug("Retimestamping file `%s' atomically", inpath)
try:
fd, temppath = tempfile.mkstemp()
os.close(fd)
# preserve the permission bits
shutil.copymode(inpath, temppath)
except EnvironmentError, (errno, strerror):
log.error("Failed to create temporary file: %s", strerror)
return False
if not retimestamp_file_atomically(inpath, temppath):
log.error("Failed to retimestamp `%s' atomically", inpath)
# remove the temporary files
force_remove(temppath)
return False
# index the temporary file
if not index_file(temppath, outpath):
force_remove(temppath)
return False
if not outpath:
# If we were not writing directly to the output file
# we need to overwrite the original
try:
shutil.move(temppath, inpath)
except EnvironmentError, (errno, strerror):
log.error("Failed to overwrite the original file with the "
"retimestamped and indexed version: %s", strerror)
return False
else:
# if we were writing directly to the output file we need to remove
# the retimestamped temporary file
force_remove(temppath)
return True
def copy_helper(style, app_or_project, name, directory, other_name=''):
"""
Copies either a Django application layout template or a Django project
layout template into the specified directory.
"""
# style -- A color style object (see django.core.management.color).
# app_or_project -- The string 'app' or 'project'.
# name -- The name of the application or project.
# directory -- The directory to which the layout template should be copied.
# other_name -- When copying an application layout, this should be the name
# of the project.
import re
import shutil
other = {'project': 'app', 'app': 'project'}[app_or_project]
if not re.search(r'^[_a-zA-Z]\w*$', name): # If it's not a valid directory name.
# Provide a smart error message, depending on the error.
if not re.search(r'^[_a-zA-Z]', name):
message = 'make sure the name begins with a letter or underscore'
else:
message = 'use only numbers, letters and underscores'
raise CommandError("%r is not a valid %s name. Please %s." % (name, app_or_project, message))
top_dir = os.path.join(directory, name)
try:
os.mkdir(top_dir)
except OSError, e:
raise CommandError(e)
# Determine where the app or project templates are. Use
# django.__path__[0] because we don't know into which directory
# django has been installed.
template_dir = os.path.join(django.__path__[0], 'conf', '%s_template' % app_or_project)
for d, subdirs, files in os.walk(template_dir):
relative_dir = d[len(template_dir)+1:].replace('%s_name' % app_or_project, name)
if relative_dir:
os.mkdir(os.path.join(top_dir, relative_dir))
for subdir in subdirs[:]:
if subdir.startswith('.'):
subdirs.remove(subdir)
for f in files:
if not f.endswith('.py'):
# Ignore .pyc, .pyo, .py.class etc, as they cause various
# breakages.
continue
path_old = os.path.join(d, f)
path_new = os.path.join(top_dir, relative_dir, f.replace('%s_name' % app_or_project, name))
fp_old = open(path_old, 'r')
fp_new = open(path_new, 'w')
fp_new.write(fp_old.read().replace('{{ %s_name }}' % app_or_project, name).replace('{{ %s_name }}' % other, other_name))
fp_old.close()
fp_new.close()
try:
shutil.copymode(path_old, path_new)
_make_writeable(path_new)
except OSError:
sys.stderr.write(style.NOTICE("Notice: Couldn't set permission bits on %s. You're probably using an uncommon filesystem setup. No problem.\n" % path_new))
def install_scripts(self, context, path):
"""
Install scripts into the created environment from a directory.
:param context: The information for the environment creation request
being processed.
:param path: Absolute pathname of a directory containing script.
Scripts in the 'common' subdirectory of this directory,
and those in the directory named for the platform
being run on, are installed in the created environment.
Placeholder variables are replaced with environment-
specific values.
"""
binpath = context.bin_path
plen = len(path)
for root, dirs, files in os.walk(path):
if root == path: # at top-level, remove irrelevant dirs
for d in dirs[:]:
if d not in ('common', os.name):
dirs.remove(d)
continue # ignore files in top level
for f in files:
srcfile = os.path.join(root, f)
suffix = root[plen:].split(os.sep)[2:]
if not suffix:
dstdir = binpath
else:
dstdir = os.path.join(binpath, *suffix)
if not os.path.exists(dstdir):
os.makedirs(dstdir)
dstfile = os.path.join(dstdir, f)
with open(srcfile, 'rb') as f:
data = f.read()
if srcfile.endswith('.exe'):
mode = 'wb'
else:
mode = 'w'
try:
data = data.decode('utf-8')
data = self.replace_variables(data, context)
except UnicodeDecodeError as e:
data = None
logger.warning('unable to copy script %r, '
'may be binary: %s', srcfile, e)
if data is not None:
with open(dstfile, mode) as f:
f.write(data)
shutil.copymode(srcfile, dstfile)