Python shutil 模块,chown() 实例源码
我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用shutil.chown()。
def daemote(pid_file, user, group):
''' Change gid and uid, dropping privileges.
Either user or group may explicitly pass None to keep it the same.
The pid_file will be chown'ed so it can still be cleaned up.
'''
if not _SUPPORTED_PLATFORM:
raise OSError('Daemotion is unsupported on your platform.')
# No need to do anything special, just chown the pidfile
# This will also catch any bad group, user names
shutil.chown(pid_file, user, group)
# Now update group and then user
_setgroup(group)
_setuser(user)
def install_service_dir(path, owner, mode):
if not os.path.exists('/var/prologin'):
mkdir('/var/prologin', mode=0o755, owner='root:root')
name = os.path.basename(path) # strip service kind from path (eg. django)
# Nothing in Python allows merging two directories together...
# Be careful with rsync(1) arguments: to merge two directories, trailing
# slash are meaningful.
system('rsync -rv %s/ /var/prologin/%s' % (path, name))
user, group = owner.split(':')
shutil.chown('/var/prologin/%s' % name, user, group)
os.chmod('/var/prologin/%s' % name, mode)
for root, dirs, files in os.walk('/var/prologin/%s' % name):
for dir in dirs:
shutil.chown(os.path.join(root, dir), user, group)
os.chmod(os.path.join(root, dir), mode)
for file in files:
shutil.chown(os.path.join(root, file), user, group)
os.chmod(os.path.join(root, file), mode)
def save_data(self, data):
# Age data
if hasattr(data, '__len__') and len(data) > self.GROOM_THRESHOLD:
for i, e in enumerate(data):
data[i] = ZEntry(e.path, int(e.rank * self.GROOM_LEVEL), e.time)
# Use a temporary file to minimize time the file is open and minimize clobbering
from tempfile import NamedTemporaryFile
with NamedTemporaryFile('wt', encoding=sys.getfilesystemencoding()) as f:
for e in data:
f.write("{}|{}|{}\n".format(e.path, int(e.rank), int(e.time.timestamp())))
f.flush()
if self.Z_OWNER:
shutil.chown(f.name, user=self.Z_OWNER)
# On POSIX, rename() is atomic and will clobber
# On Windows, neither of these is true, so remove first.
from xonsh.platform import ON_WINDOWS
if ON_WINDOWS and os.path.exists(self.Z_DATA):
os.remove(self.Z_DATA)
shutil.copy(f.name, self.Z_DATA)
def setup_volumes(self):
for img_file in ('overcloud-full.vmlinuz', 'overcloud-full.initrd',
self.image_name):
src_img = os.path.join(self.image_path, img_file)
if img_file == self.image_name:
dest_img = os.path.join(constants.LIBVIRT_VOLUME_PATH,
'undercloud.qcow2')
else:
dest_img = os.path.join(constants.LIBVIRT_VOLUME_PATH,
img_file)
if not os.path.isfile(src_img):
raise ApexUndercloudException(
"Required source file does not exist:{}".format(src_img))
if os.path.exists(dest_img):
os.remove(dest_img)
shutil.copyfile(src_img, dest_img)
shutil.chown(dest_img, user='qemu', group='qemu')
os.chmod(dest_img, 0o0744)
# TODO(trozet):check if resize needed right now size is 50gb
# there is a lib called vminspect which has some dependencies and is
# not yet available in pip. Consider switching to this lib later.
def inject_auth(self):
virt_ops = list()
# virt-customize keys/pws
if self.root_pw:
pw_op = "password:{}".format(self.root_pw)
virt_ops.append({constants.VIRT_PW: pw_op})
# ssh key setup
virt_ops.append({constants.VIRT_RUN_CMD:
'mkdir -p /root/.ssh'})
virt_ops.append({constants.VIRT_UPLOAD:
'/root/.ssh/id_rsa.pub:/root/.ssh/authorized_keys'})
run_cmds = [
'chmod 600 /root/.ssh/authorized_keys',
'restorecon /root/.ssh/authorized_keys',
'cp /root/.ssh/authorized_keys /home/stack/.ssh/',
'chown stack:stack /home/stack/.ssh/authorized_keys',
'chmod 600 /home/stack/.ssh/authorized_keys'
]
for cmd in run_cmds:
virt_ops.append({constants.VIRT_RUN_CMD: cmd})
virt_utils.virt_customize(virt_ops, self.volume)
def init_db():
"""Initialize the database."""
config_data = get_config_data()
db_path = os.path.join(os.environ['SNAP_COMMON'], 'db')
if os.path.exists(db_path):
shutil.rmtree(db_path)
os.mkdir(db_path)
shutil.chown(db_path, user='nobody', group='nogroup')
log_path = os.path.join(os.environ['SNAP_COMMON'], 'log', 'postgresql.log')
if not os.path.exists(log_path):
open(log_path, 'a').close()
shutil.chown(log_path, user='nobody', group='nogroup')
def _init_db():
subprocess.check_output([
os.path.join(os.environ['SNAP'], 'bin', 'initdb'),
'-D', os.path.join(os.environ['SNAP_COMMON'], 'db'),
'-U', 'postgres', '-E', 'UTF8', '--locale=C'],
stderr=subprocess.STDOUT)
with with_postgresql():
create_db(config_data)
run_with_drop_privileges(_init_db)
def change_owner(full_path, user_name=None, group_name=None):
try:
shutil.chown(full_path, user_name, group_name)
except:
raise
def chown(path, filename):
"""Set owner and group of file to that of the parent directory."""
s = os.stat(path)
os.chown(os.path.join(path, filename), s.st_uid, s.st_gid)
def chown_dir(directory, username):
"""Set owner and group of directory to username."""
shutil.chown(directory, username, username)
for root, dirs, files in os.walk(directory):
for child in dirs + files:
shutil.chown(os.path.join(root, child), username, username)
logger.info("{} chown'd to {}".format(directory, username))
def chown(path, filename):
"""Set owner and group of file to that of the parent directory."""
s = os.stat(path)
os.chown(os.path.join(path, filename), s.st_uid, s.st_gid)
def chown_dir(directory, username):
"""Set owner and group of directory to username."""
shutil.chown(directory, username, username)
for root, dirs, files in os.walk(directory):
for child in dirs + files:
shutil.chown(os.path.join(root, child), username, username)
logger.info("{} chown'd to {}".format(directory, username))
def handle_sonarr(torrent):
for index, file in torrent.files().items():
file_path = os.path.join(torrent.downloadDir, file['name'])
if file_path.endswith('rar'):
with tempfile.TemporaryDirectory() as temp_dir:
paths = extract(file_path, temp_dir)
if paths:
# Move extracted files to sonarr drone factory
for path in paths:
shutil.chown(path, group=plex_group)
os.chmod(path, 0o664)
shutil.move(path, drone_factory)
def mkdir(path, mode, owner='root:root'):
if os.path.exists(path):
os.chmod(path, mode)
else:
os.mkdir(path, mode)
user, group = owner.split(':')
shutil.chown(path, user, group)
def copy(old, new, mode=0o600, owner='root:root'):
print('Copying %s -> %s (mode: %o) (own: %s)' % (old, new, mode, owner))
shutil.copy(old, new)
os.chmod(new, mode)
user, group = owner.split(':')
shutil.chown(new, user, group)
def copytree(old, new, dir_mode=0o700, file_mode=0o600, owner='root:root'):
print('Copying %s -> %s (file mode: %o) (dir mode: %o) (own: %s)' % (old, new, file_mode, dir_mode, owner))
shutil.copytree(old, new)
user, group = owner.split(':')
for root, dirs, files in os.walk(new):
for momo in dirs:
path = os.path.join(root, momo)
os.chmod(path, dir_mode)
shutil.chown(path, user, group)
for momo in files:
path = os.path.join(root, momo)
os.chmod(path, file_mode)
shutil.chown(path, user, group)
def touch(path, mode=0o600, owner='root:root'):
print('Touching %s' % path)
with open(path, 'a'):
os.utime(path, None)
os.chmod(path, mode)
user, group = owner.split(':')
shutil.chown(path, user, group)
def chown_to_user(path):
shutil.chown(path, get_user_uid(), get_user_gid())
def test_module_all_attribute(self):
self.assertTrue(hasattr(shutil, '__all__'))
target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
'SpecialFileError', 'ExecError', 'make_archive',
'get_archive_formats', 'register_archive_format',
'unregister_archive_format', 'get_unpack_formats',
'register_unpack_format', 'unregister_unpack_format',
'unpack_archive', 'ignore_patterns', 'chown', 'which',
'get_terminal_size', 'SameFileError']
if hasattr(os, 'statvfs') or os.name == 'nt':
target_api.append('disk_usage')
self.assertEqual(set(shutil.__all__), set(target_api))
def changeOwnerAndGrpToLoggedInUser(directory, raiseEx=False):
loggedInUser = getLoggedInUser()
try:
shutil.chown(directory, loggedInUser, loggedInUser)
except Exception as e:
if raiseEx:
raise e
else:
pass
def chgrp(fname, grpname):
try:
shutil.chown(fname, group=grpname)
except OSError as e:
raise RuntimeError('chown(%s) failed: %s' % (fname, str(e)))
def test_chown(self):
# cleaned-up automatically by TestShutil.tearDown method
dirname = self.mkdtemp()
filename = tempfile.mktemp(dir=dirname)
write_file(filename, 'testing chown function')
with self.assertRaises(ValueError):
shutil.chown(filename)
with self.assertRaises(LookupError):
shutil.chown(filename, user='non-exising username')
with self.assertRaises(LookupError):
shutil.chown(filename, group='non-exising groupname')
with self.assertRaises(TypeError):
shutil.chown(filename, b'spam')
with self.assertRaises(TypeError):
shutil.chown(filename, 3.14)
uid = os.getuid()
gid = os.getgid()
def check_chown(path, uid=None, gid=None):
s = os.stat(filename)
if uid is not None:
self.assertEqual(uid, s.st_uid)
if gid is not None:
self.assertEqual(gid, s.st_gid)
shutil.chown(filename, uid, gid)
check_chown(filename, uid, gid)
shutil.chown(filename, uid)
check_chown(filename, uid)
shutil.chown(filename, user=uid)
check_chown(filename, uid)
shutil.chown(filename, group=gid)
check_chown(filename, gid=gid)
shutil.chown(dirname, uid, gid)
check_chown(dirname, uid, gid)
shutil.chown(dirname, uid)
check_chown(dirname, uid)
shutil.chown(dirname, user=uid)
check_chown(dirname, uid)
shutil.chown(dirname, group=gid)
check_chown(dirname, gid=gid)
user = pwd.getpwuid(uid)[0]
group = grp.getgrgid(gid)[0]
shutil.chown(filename, user, group)
check_chown(filename, uid, gid)
shutil.chown(dirname, user, group)
check_chown(dirname, uid, gid)
def handle_manual(torrent):
auto_processed = False
def handle_media(path, move):
nonlocal auto_processed
guess = guessit.guessit(path)
if guess['type'] == 'episode':
move_episode(path, guess, move)
auto_processed = True
elif guess['type'] == 'movie':
move_movie(path, guess, move)
auto_processed = True
part_regex = re.compile('.*part(\d+).rar', re.IGNORECASE)
for index, file in torrent.files().items():
file_path = os.path.join(torrent.downloadDir, file['name'])
if check_extension(file_path) and 'sample' not in file_path.lower():
# Log and ignore mkv files of less than ~92MiB
try:
if os.path.getsize(file_path) >= 96811278:
handle_media(file_path, False)
else:
syslog.syslog(
syslog.LOG_ERR, 'Detected false media file, skipping'
)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, 'Torrent file missing, skipping')
elif file_path.endswith('rar'):
# Ignore parts beyond the first in a rar series
match = part_regex.match(file_path)
if match and int(match.group(1)) > 1:
continue
with tempfile.TemporaryDirectory() as temp_dir:
paths = extract(file_path, temp_dir)
if paths:
for path in paths:
shutil.chown(path, group=plex_group)
os.chmod(path, 0o664)
handle_media(path, True)
if auto_processed:
pb_notify(
textwrap.dedent(
'''
Manually added torrent {0} finished downloading
and was auto-processed
'''.format(torrent.name)
).strip()
)
else:
pb_notify(
'Manually added torrent {0} finished downloading'.format(
torrent.name
)
)
def test_chown(self):
# cleaned-up automatically by TestShutil.tearDown method
dirname = self.mkdtemp()
filename = tempfile.mktemp(dir=dirname)
write_file(filename, 'testing chown function')
with self.assertRaises(ValueError):
shutil.chown(filename)
with self.assertRaises(LookupError):
shutil.chown(filename, user='non-exising username')
with self.assertRaises(LookupError):
shutil.chown(filename, group='non-exising groupname')
with self.assertRaises(TypeError):
shutil.chown(filename, b'spam')
with self.assertRaises(TypeError):
shutil.chown(filename, 3.14)
uid = os.getuid()
gid = os.getgid()
def check_chown(path, uid=None, gid=None):
s = os.stat(filename)
if uid is not None:
self.assertEqual(uid, s.st_uid)
if gid is not None:
self.assertEqual(gid, s.st_gid)
shutil.chown(filename, uid, gid)
check_chown(filename, uid, gid)
shutil.chown(filename, uid)
check_chown(filename, uid)
shutil.chown(filename, user=uid)
check_chown(filename, uid)
shutil.chown(filename, group=gid)
check_chown(filename, gid=gid)
shutil.chown(dirname, uid, gid)
check_chown(dirname, uid, gid)
shutil.chown(dirname, uid)
check_chown(dirname, uid)
shutil.chown(dirname, user=uid)
check_chown(dirname, uid)
shutil.chown(dirname, group=gid)
check_chown(dirname, gid=gid)
user = pwd.getpwuid(uid)[0]
group = grp.getgrgid(gid)[0]
shutil.chown(filename, user, group)
check_chown(filename, uid, gid)
shutil.chown(dirname, user, group)
check_chown(dirname, uid, gid)
def test_chown(self):
# cleaned-up automatically by TestShutil.tearDown method
dirname = self.mkdtemp()
filename = tempfile.mktemp(dir=dirname)
write_file(filename, 'testing chown function')
with self.assertRaises(ValueError):
shutil.chown(filename)
with self.assertRaises(LookupError):
shutil.chown(filename, user='non-exising username')
with self.assertRaises(LookupError):
shutil.chown(filename, group='non-exising groupname')
with self.assertRaises(TypeError):
shutil.chown(filename, b'spam')
with self.assertRaises(TypeError):
shutil.chown(filename, 3.14)
uid = os.getuid()
gid = os.getgid()
def check_chown(path, uid=None, gid=None):
s = os.stat(filename)
if uid is not None:
self.assertEqual(uid, s.st_uid)
if gid is not None:
self.assertEqual(gid, s.st_gid)
shutil.chown(filename, uid, gid)
check_chown(filename, uid, gid)
shutil.chown(filename, uid)
check_chown(filename, uid)
shutil.chown(filename, user=uid)
check_chown(filename, uid)
shutil.chown(filename, group=gid)
check_chown(filename, gid=gid)
shutil.chown(dirname, uid, gid)
check_chown(dirname, uid, gid)
shutil.chown(dirname, uid)
check_chown(dirname, uid)
shutil.chown(dirname, user=uid)
check_chown(dirname, uid)
shutil.chown(dirname, group=gid)
check_chown(dirname, gid=gid)
user = pwd.getpwuid(uid)[0]
group = grp.getgrgid(gid)[0]
shutil.chown(filename, user, group)
check_chown(filename, uid, gid)
shutil.chown(dirname, user, group)
check_chown(dirname, uid, gid)