Python os 模块,umask() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.umask()。
def generate_or_read_from_file(key_file='.secret_key', key_length=64):
"""Multiprocess-safe secret key file generator.
Useful to replace the default (and thus unsafe) SECRET_KEY in settings.py
upon first start. Save to use, i.e. when multiple Python interpreters
serve the dashboard Django application (e.g. in a mod_wsgi + daemonized
environment). Also checks if file permissions are set correctly and
throws an exception if not.
"""
abspath = os.path.abspath(key_file)
lock = lockutils.external_lock(key_file + ".lock",
lock_path=os.path.dirname(abspath))
with lock:
if not os.path.exists(key_file):
key = generate_key(key_length)
old_umask = os.umask(0o177) # Use '0600' file permissions
with open(key_file, 'w') as f:
f.write(key)
os.umask(old_umask)
else:
if (os.stat(key_file).st_mode & 0o777) != 0o600:
raise FilePermissionError("Insecure key file permissions!")
with open(key_file, 'r') as f:
key = f.readline()
return key
def clearUnMasked(func):
"""Decorator which clears the umask for a method.
The umask is a permissions mask that gets applied
whenever new files or folders are created. For I/O methods
that have a permissions parameter, it is important that the
umask is cleared prior to execution, otherwise the default
umask may alter the resulting permissions
:type func: function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# set umask to zero, store old umask
oldMask = os.umask(0)
try:
# execute method payload
return func(*args, **kwargs)
finally:
# set mask back to previous value
os.umask(oldMask)
return wrapper
def drop_privileges(self, uid_name, gid_name):
if os.getuid() != 0:
# We're not root so, like, whatever dude
self.logger.info("Not running as root. Cannot drop permissions.")
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(0o077)
self.logger.info("Changed permissions to: %s: %i, %s, %i"%(uid_name, running_uid, gid_name, running_gid))
def daemonize():
# See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(077)
null=os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)
def generate(self):
return textwrap.dedent("""
import pupy, os
if os.name == 'posix':
pupy.infos['daemonize']=True
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(022) # Don't allow others to write
null=os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)
""")
def _perform_single(self, achall):
response, validation = achall.response_and_validation()
root_path = self.full_roots[achall.domain]
validation_path = self._get_validation_path(root_path, achall)
logger.debug("Attempting to save validation to %s", validation_path)
# Change permissions to be world-readable, owner-writable (GH #1795)
old_umask = os.umask(0o022)
try:
with open(validation_path, "wb") as validation_file:
validation_file.write(validation.encode())
finally:
os.umask(old_umask)
self.performed[root_path].add(achall)
return response
def _create_default_file():
"""Creates the default profile file, returning its contents."""
import json
profiles_default_data = {
'active_profile': None,
'profiles': {}
}
os.makedirs(profiles_path, exist_ok=True)
# Populate the file, ensuring that its permissions are restrictive enough.
old_umask = os.umask(0o077)
try:
with open(profiles_file, 'w', encoding='utf8') as outfile:
json.dump(profiles_default_data, outfile)
finally:
os.umask(old_umask)
return profiles_default_data
def become_daemon(self, root_dir='/'):
if os.fork() != 0: # launch child and ...
os._exit(0) # kill off parent
os.setsid()
os.chdir(root_dir)
os.umask(0)
if os.fork() != 0: # fork again so we are not a session leader
os._exit(0)
sys.stdin.close()
sys.__stdin__ = sys.stdin
sys.stdout.close()
sys.stdout = sys.__stdout__ = _NullDevice()
sys.stderr.close()
sys.stderr = sys.__stderr__ = _NullDevice()
for fd in range(1024):
try:
os.close(fd)
except OSError:
pass
def __init__(self, cfg):
old_umask = os.umask(cfg.umask)
fdir = cfg.worker_tmp_dir
if fdir and not os.path.isdir(fdir):
raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir)
fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir)
# allows the process to write to the file
util.chown(name, cfg.uid, cfg.gid)
os.umask(old_umask)
# unlink the file so we don't leak tempory files
try:
if not IS_CYGWIN:
util.unlink(name)
self._tmp = os.fdopen(fd, 'w+b', 1)
except:
os.close(fd)
raise
self.spinner = 0
def generate(self):
return textwrap.dedent("""
import pupy, os
if os.name == 'posix':
pupy.infos['daemonize']=True
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(022) # Don't allow others to write
null=os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)
""")
def get_process_umask():
result = os.umask(0o22)
os.umask(result)
return result
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def current_umask():
tmp = os.umask(0o022)
os.umask(tmp)
return tmp
def daemonize(self):
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
os.chdir("/")
os.setsid()
os.umask(0)
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile,'w+').write("%s\n" % pid)
def daemonize(double_fork=True):
'''Puts process in the background using usual UNIX best practices.'''
try:
os.umask(0o22)
except Exception as e:
raise Exception("Unable to change file creation mask: %s" % e)
os.chdir('/')
# First fork
if double_fork:
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
raise Exception("Error on first fork: [%d] %s" % (e.errno, e.strerr,))
os.setsid()
# Second fork
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
raise Exception("Error on second fork: [%d] %s" % (e.errno, e.strerr,))
close_open_files()
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
def get_process_umask():
result = os.umask(0o22)
os.umask(result)
return result
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def current_umask():
tmp = os.umask(0o022)
os.umask(tmp)
return tmp
def open(file, flag=None, mode=0666):
"""Open the database file, filename, and return corresponding object.
The flag argument, used to control how the database is opened in the
other DBM implementations, is ignored in the dumbdbm module; the
database is always opened for update, and will be created if it does
not exist.
The optional mode argument is the UNIX mode of the file, used only when
the database has to be created. It defaults to octal code 0666 (and
will be modified by the prevailing umask).
"""
# flag argument is currently ignored
# Modify mode depending on the umask
try:
um = _os.umask(0)
_os.umask(um)
except AttributeError:
pass
else:
# Turn off any bits that are set in the umask
mode = mode & (~um)
return _Database(file, mode)
def _create_file_if_needed(self):
"""Create an empty file if necessary.
This method will not initialize the file. Instead it implements a
simple version of "touch" to ensure the file has been created.
"""
if not os.path.exists(self._file.filename()):
old_umask = os.umask(0o177)
try:
open(self._file.filename(), 'a+b').close()
finally:
os.umask(old_umask)
def _create_file_if_needed(self):
"""Create an empty file if necessary.
This method will not initialize the file. Instead it implements a
simple version of "touch" to ensure the file has been created.
"""
if not os.path.exists(self._filename):
old_umask = os.umask(0o177)
try:
open(self._filename, 'a+b').close()
finally:
os.umask(old_umask)
def _setupSocket(self):
"""Creates and binds the socket for communication with the server."""
oldUmask = None
if type(self._bindAddress) is str:
# Unix socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.unlink(self._bindAddress)
except OSError:
pass
if self._umask is not None:
oldUmask = os.umask(self._umask)
else:
# INET socket
assert type(self._bindAddress) is tuple
assert len(self._bindAddress) == 2
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(self._bindAddress)
sock.listen(socket.SOMAXCONN)
if oldUmask is not None:
os.umask(oldUmask)
return sock
def get_process_umask():
result = os.umask(0o22)
os.umask(result)
return result
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def current_umask():
tmp = os.umask(0x12) # 022
os.umask(tmp)
return tmp
def get_process_umask():
result = os.umask(0o22)
os.umask(result)
return result
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def main():
os.umask(0)
create_wishlist()
create_baitlist()
create_oneclick()
names_from_wishlist()
while True:
names_from_wishlist()
time.sleep(60)
print_modellist()
def main():
os.umask(0)
create_wgetrc()
create_wishlist()
names_from_wishlist()
while True:
names_from_wishlist()
print("Currently following Models are being baited:")
print_modellist()
time.sleep(60)
def main():
os.umask(0)
create_wishlist()
create_baitlist()
create_oneclick()
names_from_wishlist()
while True:
names_from_wishlist()
time.sleep(60)
print_modellist()
def daemonize(stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"):
'''
Daemonize current script
'''
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write ("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror) )
sys.exit(1)
os.chdir("/")
os.umask(0)
os.setsid()
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write ("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror) )
sys.exit(1)
stdin_par = os.path.dirname(stdin)
stdout_par = os.path.dirname(stdout)
stderr_par = os.path.dirname(stderr)
if not stdin_par:
os.path.makedirs(stdin_par)
if not stdout_par:
os.path.makedirs(stdout_par)
if not stderr_par:
os.path.makedirs(stderr_par)
si = open(stdin, 'r')
so = open(stdout, 'a+')
se = open(stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
def get_process_umask():
result = os.umask(0o22)
os.umask(result)
return result
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def current_umask():
tmp = os.umask(0o022)
os.umask(tmp)
return tmp
def _create_file_if_needed(self):
"""Create an empty file if necessary.
This method will not initialize the file. Instead it implements a
simple version of "touch" to ensure the file has been created.
"""
if not os.path.exists(self._filename):
old_umask = os.umask(0177)
try:
open(self._filename, 'a+b').close()
finally:
os.umask(old_umask)
def _create_file_if_needed(self):
"""Create an empty file if necessary.
This method will not initialize the file. Instead it implements a
simple version of "touch" to ensure the file has been created.
"""
if not os.path.exists(self._file.filename()):
old_umask = os.umask(0177)
try:
open(self._file.filename(), 'a+b').close()
finally:
os.umask(old_umask)
def get_process_umask():
result = os.umask(0o22)
os.umask(result)
return result
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def current_umask():
tmp = os.umask(0o022)
os.umask(tmp)
return tmp
def get_process_umask():
result = os.umask(0o22)
os.umask(result)
return result
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def current_umask():
tmp = os.umask(0o022)
os.umask(tmp)
return tmp