Python os 模块,fdopen() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.fdopen()。
def empty_db(self):
cmd = [
"mysqldump",
"-u%(user)s" % self.db_config,
"-h%(host)s" % self.db_config,
"--add_drop-table",
"--no-data",
"%(name)s" % self.db_config,
]
tmphandle, tmppath = tempfile.mkstemp(text=True)
tmpfile = os.fdopen(tmphandle, "w")
sql_data = subprocess.check_output(cmd, stderr=None).split('\n')
tmpfile.write("SET FOREIGN_KEY_CHECKS = 0;\n")
tmpfile.write("use %(name)s;\n" % self.db_config)
for line in sql_data:
if line.startswith("DROP"):
tmpfile.write(line + '\n')
tmpfile.close()
self._run_mysql_cmd("source %s" % tmppath)
os.remove(tmppath)
def main():
'''
Run code specifed by data received over pipe
'''
assert is_forking(sys.argv)
handle = int(sys.argv[-1])
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
from_parent = os.fdopen(fd, 'rb')
process.current_process()._inheriting = True
preparation_data = load(from_parent)
prepare(preparation_data)
self = load(from_parent)
process.current_process()._inheriting = False
from_parent.close()
exitcode = self._bootstrap()
exit(exitcode)
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def __init__(self, cmd, bufsize=-1):
_cleanup()
self.cmd = cmd
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
os.dup2(c2pwrite, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def Set(self,key,data):
path = self._GetPath(key)
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
if not os.path.isdir(directory):
raise _FileCacheError('%s exists but is not a directory' % directory)
temp_fd, temp_path = tempfile.mkstemp()
temp_fp = os.fdopen(temp_fd, 'w')
temp_fp.write(data)
temp_fp.close()
if not path.startswith(self._root_directory):
raise _FileCacheError('%s does not appear to live under %s' %
(path, self._root_directory))
if os.path.exists(path):
os.remove(path)
os.rename(temp_path, path)
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
if sys.hexversion > 0x3000000 and not 'b' in m:
data = data.encode(encoding)
m += 'b'
flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
if 'b' in m:
flags |= os.O_BINARY
if '+' in m:
flags |= os.O_RDWR
try:
fd = os.open(f, flags)
except OSError:
raise IOError('Cannot write to %r' % f)
f = os.fdopen(fd, m)
try:
f.write(data)
finally:
f.close()
def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
if sys.hexversion > 0x3000000 and not 'b' in m:
data = data.encode(encoding)
m += 'b'
flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
if 'b' in m:
flags |= os.O_BINARY
if '+' in m:
flags |= os.O_RDWR
try:
fd = os.open(f, flags)
except OSError:
raise IOError('Cannot write to %r' % f)
f = os.fdopen(fd, m)
try:
f.write(data)
finally:
f.close()
def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
if sys.hexversion > 0x3000000 and not 'b' in m:
data = data.encode(encoding)
m += 'b'
flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
if 'b' in m:
flags |= os.O_BINARY
if '+' in m:
flags |= os.O_RDWR
try:
fd = os.open(f, flags)
except OSError:
raise IOError('Cannot write to %r' % f)
f = os.fdopen(fd, m)
try:
f.write(data)
finally:
f.close()
def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def setup_environment():
root = os.getenv('LAMBDA_TASK_ROOT')
bin_dir = os.path.join(root, 'bin')
os.environ['PATH'] += ':' + bin_dir
os.environ['GIT_EXEC_PATH'] = bin_dir
ssh_dir = tempfile.mkdtemp()
ssh_identity = os.path.join(ssh_dir, 'identity')
with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600),
'w') as f:
f.write(base64.b64decode(os.getenv('SSH_IDENTITY')))
ssh_config = os.path.join(ssh_dir, 'config')
with open(ssh_config, 'w') as f:
f.write('CheckHostIP no\n'
'StrictHostKeyChecking yes\n'
'IdentityFile %s\n'
'UserKnownHostsFile %s\n' %
(ssh_identity, os.path.join(root, 'known_hosts')))
os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def convert(cls, report, data):
"converts the report data to another mimetype if necessary"
input_format = report.template_extension
output_format = report.extension or report.template_extension
if output_format in MIMETYPES:
return output_format, data
fd, path = tempfile.mkstemp(suffix=(os.extsep + input_format),
prefix='trytond_')
oext = FORMAT2EXT.get(output_format, output_format)
with os.fdopen(fd, 'wb+') as fp:
fp.write(data)
cmd = ['unoconv', '--connection=%s' % config.get('report', 'unoconv'),
'-f', oext, '--stdout', path]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdoutdata, stderrdata = proc.communicate()
if proc.wait() != 0:
raise Exception(stderrdata)
return oext, stdoutdata
finally:
os.remove(path)
def test_basic_config():
fd, path = tempfile.mkstemp()
f = os.fdopen(fd,'w')
f.write(yaml.dump(testcfg))
f.flush()
cfg = ny.get_config(path)
ny.write_supervisor_conf()
config = ConfigParser.ConfigParser()
config.readfp(open(cfg['supervisor.conf']))
# from IPython import embed
# embed()
print(config.get('program:testtunnel','command'))
assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def stdout_redirected(to):
"""Lifted from: https://stackoverflow.com/questions/4675728/redirect-stdout-to-a-file-in-python
This is the only way I've found to redirect stdout with curses. This way the
output from questionnaire can be piped to another program, without piping
what's written to the terminal by the prompters.
"""
stdout = sys.stdout
stdout_fd = fileno(stdout)
# copy stdout_fd before it is overwritten
with os.fdopen(os.dup(stdout_fd), 'wb') as copied:
stdout.flush() # flush library buffers that dup2 knows nothing about
try:
os.dup2(fileno(to), stdout_fd) # $ exec >&to
except ValueError: # filename
with open(to, 'wb') as to_file:
os.dup2(to_file.fileno(), stdout_fd) # $ exec > to
try:
yield stdout # allow code to be run with the redirected stdout
finally:
# restore stdout to its previous value
stdout.flush()
os.dup2(copied.fileno(), stdout_fd)
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def main():
'''
Run code specified by data received over pipe
'''
assert is_forking(sys.argv)
handle = int(sys.argv[-1])
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
from_parent = os.fdopen(fd, 'rb')
process.current_process()._inheriting = True
preparation_data = load(from_parent)
prepare(preparation_data)
self = load(from_parent)
process.current_process()._inheriting = False
from_parent.close()
exitcode = self._bootstrap()
exit(exitcode)
def __init__(self, cmd, bufsize=-1):
_cleanup()
self.cmd = cmd
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
os.dup2(c2pwrite, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def create_file(self, name, excl=False, mode="wb", **kwargs):
"""Creates a file with the given name in this storage.
:param name: the name for the new file.
:param excl: if True, try to open the file in "exclusive" mode.
:param mode: the mode flags with which to open the file. The default is
``"wb"``.
:return: a :class:`whoosh.filedb.structfile.StructFile` instance.
"""
if self.readonly:
raise ReadOnlyError
path = self._fpath(name)
if excl:
flags = os.O_CREAT | os.O_EXCL | os.O_RDWR
if hasattr(os, "O_BINARY"):
flags |= os.O_BINARY
fd = os.open(path, flags)
fileobj = os.fdopen(fd, mode)
else:
fileobj = open(path, mode)
f = StructFile(fileobj, name=name, **kwargs)
return f
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def test_FILE_stored_explicitly():
ffi = FFI()
ffi.cdef("int myprintf11(const char *, int); FILE *myfile;")
lib = ffi.verify("""
#include <stdio.h>
FILE *myfile;
int myprintf11(const char *out, int value) {
return fprintf(myfile, out, value);
}
""")
import os
fdr, fdw = os.pipe()
fw1 = os.fdopen(fdw, 'wb', 256)
lib.myfile = ffi.cast("FILE *", fw1)
#
fw1.write(b"X")
r = lib.myprintf11(b"hello, %d!\n", ffi.cast("int", 42))
fw1.close()
assert r == len("hello, 42!\n")
#
result = os.read(fdr, 256)
os.close(fdr)
# the 'X' might remain in the user-level buffer of 'fw1' and
# end up showing up after the 'hello, 42!\n'
assert result == b"Xhello, 42!\n" or result == b"hello, 42!\nX"
def test_ffi_buffer_with_file(self):
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def test_ffi_buffer_with_file(self):
ffi = FFI(backend=self.Backend())
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def _decompress_bz2(filename, blocksize=900*1024):
"""
Decompress .tar.bz2 to .tar on disk (for faster access)
Use TemporaryFile to guarentee write access.
"""
if not filename.endswith('.tar.bz2'):
return filename
fd, path = mkstemp()
with os.fdopen(fd, 'wb') as fo:
with open(filename, 'rb') as fi:
z = bz2.BZ2Decompressor()
for block in iter(lambda: fi.read(blocksize), b''):
fo.write(z.decompress(block))
return path
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def spawn(argv, stdout=False):
"""Asynchronously run a program. argv[0] is the executable name, which
must be fully qualified or in the path. If stdout is True, return
a file object corresponding to the child's standard output; otherwise,
return the child's process ID.
argv must be strictly str objects to avoid encoding confusion.
"""
types = map(type, argv)
if not (min(types) == max(types) == str):
raise TypeError("executables and arguments must be str objects")
logger.debug("Running %r" % " ".join(argv))
args = GLib.spawn_async(argv=argv, flags=GLib.SpawnFlags.SEARCH_PATH,
standard_output=stdout)
if stdout:
return os.fdopen(args[2])
else:
return args[0]
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def main():
if sys.platform in ['win32', 'cygwin']:
return 0
# This script is called by gclient. gclient opens its hooks subprocesses with
# (stdout=subprocess.PIPE, stderr=subprocess.STDOUT) and then does custom
# output processing that breaks printing '\r' characters for single-line
# updating status messages as printed by curl and wget.
# Work around this by setting stderr of the update.sh process to stdin (!):
# gclient doesn't redirect stdin, and while stdin itself is read-only, a
# dup()ed sys.stdin is writable, try
# fd2 = os.dup(sys.stdin.fileno()); os.write(fd2, 'hi')
# TODO: Fix gclient instead, http://crbug.com/95350
return subprocess.call(
[os.path.join(os.path.dirname(__file__), 'update.sh')] + sys.argv[1:],
stderr=os.fdopen(
os.dup(
sys.stdin.fileno())))
def close(self):
""" Implements GVSvgDoc.close() """
GVDocBase.close(self)
# Make sure the extension is correct
if self._filename[-4:] != ".svg":
self._filename += ".svg"
# Create a temporary dot file
(handle, tmp_dot) = tempfile.mkstemp(".gv" )
dotfile = os.fdopen(handle, "wb")
dotfile.write(self._dot.getvalue())
dotfile.close()
# Generate the SVG file.
os.system( 'dot -Tsvg:cairo -o"%s" "%s"' % (self._filename, tmp_dot) )
# Delete the temporary dot file
os.remove(tmp_dot)
#-------------------------------------------------------------------------------
#
# GVSvgzDoc
#
#-------------------------------------------------------------------------------
def close(self):
""" Implements GVSvgzDoc.close() """
GVDocBase.close(self)
# Make sure the extension is correct
if self._filename[-5:] != ".svgz":
self._filename += ".svgz"
# Create a temporary dot file
(handle, tmp_dot) = tempfile.mkstemp(".gv" )
dotfile = os.fdopen(handle, "wb")
dotfile.write(self._dot.getvalue())
dotfile.close()
# Generate the SVGZ file.
os.system( 'dot -Tsvgz -o"%s" "%s"' % (self._filename, tmp_dot) )
# Delete the temporary dot file
os.remove(tmp_dot)
#-------------------------------------------------------------------------------
#
# GVPngDoc
#
#-------------------------------------------------------------------------------
def close(self):
""" Implements GVPngDoc.close() """
GVDocBase.close(self)
# Make sure the extension is correct
if self._filename[-4:] != ".png":
self._filename += ".png"
# Create a temporary dot file
(handle, tmp_dot) = tempfile.mkstemp(".gv" )
dotfile = os.fdopen(handle, "wb")
dotfile.write(self._dot.getvalue())
dotfile.close()
# Generate the PNG file.
os.system( 'dot -Tpng -o"%s" "%s"' % (self._filename, tmp_dot) )
# Delete the temporary dot file
os.remove(tmp_dot)
#-------------------------------------------------------------------------------
#
# GVJpegDoc
#
#-------------------------------------------------------------------------------
def close(self):
""" Implements GVGifDoc.close() """
GVDocBase.close(self)
# Make sure the extension is correct
if self._filename[-4:] != ".gif":
self._filename += ".gif"
# Create a temporary dot file
(handle, tmp_dot) = tempfile.mkstemp(".gv" )
dotfile = os.fdopen(handle, "wb")
dotfile.write(self._dot.getvalue())
dotfile.close()
# Generate the GIF file.
os.system( 'dot -Tgif -o"%s" "%s"' % (self._filename, tmp_dot) )
# Delete the temporary dot file
os.remove(tmp_dot)
#-------------------------------------------------------------------------------
#
# GVPdfGvDoc
#
#-------------------------------------------------------------------------------
def set(self, key, value, timeout=None):
if timeout is None:
timeout = self.default_timeout
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
f = os.fdopen(fd, 'wb')
try:
pickle.dump(int(time() + timeout), f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
finally:
f.close()
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
pass
def safe_open(path, mode="w", chmod=None, buffering=None):
"""Safely open a file.
:param str path: Path to a file.
:param str mode: Same os `mode` for `open`.
:param int chmod: Same as `mode` for `os.open`, uses Python defaults
if ``None``.
:param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
defaults if ``None``.
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
def _init_dirs(self):
test_dirs = ['a a', 'b', 'c\c', 'D_']
config = 'improbable'
root = tempfile.mkdtemp()
def cleanup():
try:
os.removedirs(root)
except (FileNotFoundError, OSError):
pass
os.chdir(root)
for dir_ in test_dirs:
os.mkdir(dir_, 0o0750)
f = '{0}.toml'.format(config)
flags = os.O_WRONLY | os.O_CREAT
rel_path = '{0}/{1}'.format(dir_, f)
abs_file_path = os.path.join(root, rel_path)
with os.fdopen(os.open(abs_file_path, flags, 0o0640), 'w') as fp:
fp.write("key = \"value is {0}\"\n".format(dir_))
return root, config, cleanup
def set(self, key, value, timeout=None):
if timeout is None:
timeout = self.default_timeout
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(int(time() + timeout), f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True