Python os 模块,chown() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.chown()。
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def extract_zip(input_stream, output_path, chown_to_user=True):
try:
zipfile = ZipFile(BytesIO(input_stream))
zipfile.extractall(output_path)
file_list = zipfile.namelist()
if chown_to_user:
# chown the extracted files to the current user, instead of root
for file_name in file_list:
file_path = os.path.join(output_path, file_name)
chown_path_to_user(file_path)
return True
except Exception as ex:
logger.error(ex)
return False
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
Returns False if no ceph key is available in relation state.
"""
key = None
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def default():
"""
Remove the .../stack/defaults directory. Preserve available_roles
"""
# Keep yaml human readable/editable
friendly_dumper = yaml.SafeDumper
friendly_dumper.ignore_aliases = lambda self, data: True
preserve = {}
content = None
pathname = "/srv/pillar/ceph/stack/default/{}/cluster.yml".format('ceph')
with open(pathname, "r") as sls_file:
content = yaml.safe_load(sls_file)
preserve['available_roles'] = content['available_roles']
stack_default = "/srv/pillar/ceph/stack/default"
shutil.rmtree(stack_default)
os.makedirs("{}/{}".format(stack_default, 'ceph'))
with open(pathname, "w") as sls_file:
sls_file.write(yaml.dump(preserve, Dumper=friendly_dumper,
default_flow_style=False))
uid = pwd.getpwnam("salt").pw_uid
gid = grp.getgrnam("salt").gr_gid
for path in [stack_default, "{}/{}".format(stack_default, 'ceph'), pathname]:
os.chown(path, uid, gid)
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def get_role_key(user, role):
"""
??role?key???????????? ansible????????600????????????
:param user:
:param role:
:return: self key path
"""
user_role_key_dir = os.path.join(KEY_DIR, 'user')
user_role_key_path = os.path.join(user_role_key_dir, '%s_%s.pem' % (user.username, role.name))
mkdir(user_role_key_dir, mode=777)
if not os.path.isfile(user_role_key_path):
with open(os.path.join(role.key_path, 'id_rsa')) as fk:
with open(user_role_key_path, 'w') as fu:
fu.write(fk.read())
logger.debug(u"????????key %s, Owner: %s" % (user_role_key_path, user.username))
chown(user_role_key_path, user.username)
os.chmod(user_role_key_path, 0600)
return user_role_key_path
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def copystat(cls, src, dest, copy_own=True, copy_xattr=True):
"""
Copy all stat info (mode bits, atime, mtime, flags) from `src` to
`dest`. If `copy_own=True`, the uid and gid are also copied.
If `copy_xattr=True`, the extended attributes are also copied
(only available on Linux).
"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
os.chmod(dest, mode=mode)
os.utime(dest, ns=(st.st_atime_ns, st.st_mtime_ns))
if hasattr(st, "st_flags"):
os.chflags(dest, flags=st.st_flags)
if copy_own:
os.chown(dest, uid=st.st_uid, gid=st.st_gid)
if copy_xattr:
cls.copyxattr(src, dest)
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError, e:
raise ExtractError("could not change owner")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def handle(self):
local_item_tmp_path = self.local_parent_path + get_tmp_filename(self.item_name)
try:
with open(local_item_tmp_path, 'wb') as f:
self.drive.download_file(file=f, size=self._item.size, item_id=self._item.id)
local_sha1 = hasher.hash_value(local_item_tmp_path)
item_sha1 = None
if self._item.file_props is not None and self._item.file_props.hashes is not None:
item_sha1 = self._item.file_props.hashes.sha1
if item_sha1 is None:
self.logger.warn('Remote file %s has not sha1 property, we keep the file but cannot check correctness of it',
self.local_path)
elif local_sha1 != item_sha1:
self.logger.error('Mismatch hash of download file %s : remote:%s,%d local:%s %d', self.local_path,
self._item.file_props.hashes.sha1, self._item.size, local_sha1, os.path.getsize(local_item_tmp_path))
return
os.rename(local_item_tmp_path, self.local_path)
t = datetime_to_timestamp(self._item.modified_time)
os.utime(self.local_path, (t, t))
os.chown(self.local_path, OS_USER_ID, OS_USER_GID)
self.items_store.update_item(self._item, ItemRecordStatuses.DOWNLOADED)
except (IOError, OSError) as e:
self.logger.error('An IO error occurred when downloading "%s":\n%s.', self.local_path, traceback.format_exc())
except errors.OneDriveError as e:
self.logger.error('An API error occurred when downloading "%s":\n%s.', self.local_path, traceback.format_exc())
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def chown(self, path, uid, gid):
"""
Change the owner (C{uid}) and group (C{gid}) of a file. As with
python's C{os.chown} function, you must pass both arguments, so if you
only want to change one, use L{stat} first to retrieve the current
owner and group.
@param path: path of the file to change the owner and group of
@type path: str
@param uid: new owner's uid
@type uid: int
@param gid: new group id
@type gid: int
"""
path = self._adjust_cwd(path)
self._log(DEBUG, 'chown(%r, %r, %r)' % (path, uid, gid))
attr = SFTPAttributes()
attr.st_uid, attr.st_gid = uid, gid
self._request(CMD_SETSTAT, path, attr)
def copystats(from_file, to_file):
"""
Copy stat info from ``from_file`` to ``to_file`` using `shutil.copystat`.
If possible, also copy the user and/or group ownership information.
"""
shutil.copystat(from_file, to_file)
if hasattr(os, 'chown'):
st = os.stat(from_file)
# Based on GNU sed's behavior:
try:
os.chown(to_file, st.st_uid, st.st_gid)
except EnvironmentError:
try:
os.chown(to_file, -1, st.st_gid)
except EnvironmentError:
pass
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def ensure_ceph_keyring(service, user=None, group=None,
relation='ceph', key=None):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
@returns boolean: Flag to indicate whether a key was successfully written
to disk based on either relation data or a supplied key
"""
if not key:
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")