Python os 模块,execvpe() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.execvpe()。
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.pidfile is not None:
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
self.master_name = "Old Master"
return
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.pidfile is not None:
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
self.master_name = "Old Master"
return
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.pidfile is not None:
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
self.master_name = "Old Master"
return
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def open_terminal(command="bash", columns=80, lines=24):
p_pid, master_fd = pty.fork()
if p_pid == 0: # Child.
path, *args = shlex.split(command)
args = [path] + args
env = dict(TERM="linux", LC_ALL="en_GB.UTF-8",
COLUMNS=str(columns), LINES=str(lines))
try:
os.execvpe(path, args, env)
except FileNotFoundError:
print("Could not find the executable in %s. Press any key to exit." % path)
exit()
# set non blocking read
flag = fcntl.fcntl(master_fd, fcntl.F_GETFD)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
# File-like object for I/O with the child process aka command.
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines), p_pid, p_out
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.pidfile is not None:
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
self.master_name = "Old Master"
return
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.pidfile is not None:
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
self.master_name = "Old Master"
return
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.pidfile is not None:
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
self.master_name = "Old Master"
return
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def spawn(self):
env = self.env
env['TERM'] = 'linux'
self.pid, self.master = pty.fork()
if self.pid == 0:
if callable(self.command):
try:
try:
self.command()
except:
sys.stderr.write(traceback.format_exc())
sys.stderr.flush()
finally:
os._exit(0)
else:
os.execvpe(self.command[0], self.command, env)
if self.main_loop is None:
fcntl.fcntl(self.master, fcntl.F_SETFL, os.O_NONBLOCK)
atexit.register(self.terminate)
def _execChild(self, path, uid, gid, executable, args, environment):
"""
The exec() which is done in the forked child.
"""
if path:
os.chdir(path)
if uid is not None or gid is not None:
if uid is None:
uid = os.geteuid()
if gid is None:
gid = os.getegid()
# set the UID before I actually exec the process
os.setuid(0)
os.setgid(0)
switchUID(uid, gid)
os.execvpe(executable, args, environment)
def test_executionError(self):
"""
Raise an error during execvpe to check error management.
"""
cmd = self.getCommand('false')
d = defer.Deferred()
p = TrivialProcessProtocol(d)
def buggyexecvpe(command, args, environment):
raise RuntimeError("Ouch")
oldexecvpe = os.execvpe
os.execvpe = buggyexecvpe
try:
reactor.spawnProcess(p, cmd, [b'false'], env=None,
usePTY=self.usePTY)
def check(ignored):
errData = b"".join(p.errData + p.outData)
self.assertIn(b"Upon execvpe", errData)
self.assertIn(b"Ouch", errData)
d.addCallback(check)
finally:
os.execvpe = oldexecvpe
return d
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.pidfile is not None:
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
self.master_name = "Old Master"
return
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execChild(self, path, settingUID, uid, gid,
command, args, environment):
if path:
os.chdir(path)
# set the UID before I actually exec the process
if settingUID:
switchUID(uid, gid)
os.execvpe(command, args, environment)
def test_execvpe_with_bad_program(self):
self.assertRaises(OSError, os.execvpe, 'no such app-',
['no such app-'], None)
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists.")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
environ['GUNICORN_PID'] = str(master_pid)
if self.systemd:
environ['LISTEN_PID'] = str(os.getpid())
environ['LISTEN_FDS'] = str(len(self.LISTENERS))
else:
environ['GUNICORN_FD'] = ','.join(
str(l.fileno()) for l in self.LISTENERS)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environment
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def execvpe(self):
"""Convenience method exec's the command replacing the current process.
Returns:
Does not return. May raise an OSError though.
"""
args = copy(self._args)
if self._shell:
assert len(args) == 1
if _is_windows():
# The issue here is that in Lib/subprocess.py in
# the Python distribution, if shell=True the code
# jumps through some funky hoops setting flags on
# the Windows API calls. We need to do that, rather
# than calling os.execvpe which doesn't let us set those
# flags. So we spawn the child and then exit.
self.popen()
sys.exit(0)
else:
# this is all shell=True does on unix
args = ['/bin/sh', '-c'] + args
try:
old_dir = os.getcwd()
os.chdir(self._cwd)
sys.stderr.flush()
sys.stdout.flush()
_verbose_logger().info("$ %s", " ".join(args))
os.execvpe(args[0], args, self._env)
finally:
# avoid side effect if exec fails (or is mocked in tests)
os.chdir(old_dir)
def ExecExternalProgram(argv, environ):
"""
"""
# TODO: If there is an error, like the file isn't executable, then we should
# exit, and the parent will reap it. Should it capture stderr?
try:
os.execvpe(argv[0], argv, environ)
except OSError as e:
log('Unexpected error in execvpe(%r, %r, ...): %s', argv[0], argv, e)
# Command not found means 127. TODO: Are there other cases?
sys.exit(127)
# no return
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def _execChild(self, path, settingUID, uid, gid,
command, args, environment):
if path:
os.chdir(path)
# set the UID before I actually exec the process
if settingUID:
switchUID(uid, gid)
os.execvpe(command, args, environment)
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def create_execve(original_name):
"""
os.execve(path, args, env)
os.execvpe(file, args, env)
"""
def new_execve(path, args, env):
import os
send_process_created_message()
return getattr(os, original_name)(path, patch_args(args), env)
return new_execve
def patch_new_process_functions_with_warning():
monkey_patch_os('execl', create_warn_multiproc)
monkey_patch_os('execle', create_warn_multiproc)
monkey_patch_os('execlp', create_warn_multiproc)
monkey_patch_os('execlpe', create_warn_multiproc)
monkey_patch_os('execv', create_warn_multiproc)
monkey_patch_os('execve', create_warn_multiproc)
monkey_patch_os('execvp', create_warn_multiproc)
monkey_patch_os('execvpe', create_warn_multiproc)
monkey_patch_os('spawnl', create_warn_multiproc)
monkey_patch_os('spawnle', create_warn_multiproc)
monkey_patch_os('spawnlp', create_warn_multiproc)
monkey_patch_os('spawnlpe', create_warn_multiproc)
monkey_patch_os('spawnv', create_warn_multiproc)
monkey_patch_os('spawnve', create_warn_multiproc)
monkey_patch_os('spawnvp', create_warn_multiproc)
monkey_patch_os('spawnvpe', create_warn_multiproc)
if sys.platform != 'win32':
monkey_patch_os('fork', create_warn_multiproc)
try:
import _posixsubprocess
monkey_patch_module(_posixsubprocess, 'fork_exec', create_warn_fork_exec)
except ImportError:
pass
else:
# Windows
try:
import _subprocess
except ImportError:
import _winapi as _subprocess
monkey_patch_module(_subprocess, 'CreateProcess', create_CreateProcessWarnMultiproc)
def setup_lib_paths(fbdir, libdir):
"""This is a little bit of a hack, but it should work. If we detect that the EDFLIB_DIR is
not in LD_LIBRARY_PATH, restart after adding it.
"""
try:
libpath = os.environ['LD_LIBRARY_PATH']
except KeyError:
libpath = ""
if not (sys.platform == "win32") and (libdir not in libpath):
# To get the Fuzzbunch environment setup properly, we need to modify LD_LIBRARY_PATH.
# To do that, we need to rerun Fuzzbunch so that it picks up the new LD_LIBRARY_PATH
os.environ['LD_LIBRARY_PATH'] = "%s:%s" % (libdir,libpath)
#path = os.path.abspath(fbdir)
#args = ['"' + path + '"'] + sys.argv[1:]
#os.execvpe( 'python', ['python']+sys.argv, os.environ)
def test_execvpe_with_bad_program(self):
self.assertRaises(OSError, os.execvpe, 'no such app-',
['no such app-'], None)
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def test_execvpe_with_bad_program(self):
self.assertRaises(OSError, os.execvpe, 'no such app-',
['no such app-'], None)
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def xexec(args, stdin=None):
"""Replaces current process with command specified and passes in the
current xonsh environment.
"""
env = builtins.__xonsh_env__
denv = env.detype()
if len(args) > 0:
try:
os.execvpe(args[0], args, denv)
except FileNotFoundError as e:
return 'xonsh: ' + e.args[1] + ': ' + args[0] + '\n'
else:
return 'xonsh: exec: no args specified\n'
def setup_lib_paths(fbdir, libdir):
"""This is a little bit of a hack, but it should work. If we detect that the EDFLIB_DIR is
not in LD_LIBRARY_PATH, restart after adding it.
"""
try:
libpath = os.environ['LD_LIBRARY_PATH']
except KeyError:
libpath = ""
if not (sys.platform == "win32") and (libdir not in libpath):
# To get the Fuzzbunch environment setup properly, we need to modify LD_LIBRARY_PATH.
# To do that, we need to rerun Fuzzbunch so that it picks up the new LD_LIBRARY_PATH
os.environ['LD_LIBRARY_PATH'] = "%s:%s" % (libdir,libpath)
#path = os.path.abspath(fbdir)
#args = ['"' + path + '"'] + sys.argv[1:]
#os.execvpe( 'python', ['python']+sys.argv, os.environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists.")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
environ['GUNICORN_PID'] = str(master_pid)
if self.systemd:
environ['LISTEN_PID'] = str(os.getpid())
environ['LISTEN_FDS'] = str(len(self.LISTENERS))
else:
environ['GUNICORN_FD'] = ','.join(
str(l.fileno()) for l in self.LISTENERS)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environment
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def test_errorDuringExec(self):
"""
When L{os.execvpe} raises an exception, it will format that exception
on stderr as UTF-8, regardless of system encoding information.
"""
def execvpe(*args, **kw):
# Ensure that real traceback formatting has some non-ASCII in it,
# by forcing the filename of the last frame to contain non-ASCII.
filename = u"<\N{SNOWMAN}>"
if not isinstance(filename, str):
filename = filename.encode("utf-8")
codeobj = compile("1/0", filename, "single")
eval(codeobj)
self.patch(os, "execvpe", execvpe)
self.patch(sys, "getfilesystemencoding", lambda: "ascii")
reactor = self.buildReactor()
output = io.BytesIO()
@reactor.callWhenRunning
def whenRunning():
class TracebackCatcher(ProcessProtocol, object):
errReceived = output.write
def processEnded(self, reason):
reactor.stop()
reactor.spawnProcess(TracebackCatcher(), pyExe,
[pyExe, b"-c", b""])
self.runReactor(reactor, timeout=30)
self.assertIn(u"\N{SNOWMAN}".encode("utf-8"), output.getvalue())