Python os 模块,getpgrp() 实例源码
我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用os.getpgrp()。
def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match, or ioctl() fails on the stdout file handle, we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except OSError as err:
if err.errno == errno.ENOTTY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
except UnsupportedOperation:
# Could not get the fileno for stdout, so we must be a daemon.
is_daemon = True
return is_daemon
def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match, or ioctl() fails on the stdout file handle, we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except io.UnsupportedOperation:
# Could not get the fileno for stdout, so we must be a daemon.
is_daemon = True
except OSError as err:
if err.errno == errno.ENOTTY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
return is_daemon
def _check_ioctl_mutate_len(self, nbytes=None):
buf = array.array('i')
intsize = buf.itemsize
ids = (os.getpgrp(), os.getsid(0))
# A fill value unlikely to be in `ids`
fill = -12345
if nbytes is not None:
# Extend the buffer so that it is exactly `nbytes` bytes long
buf.extend([fill] * (nbytes // intsize))
self.assertEqual(len(buf) * intsize, nbytes) # sanity check
else:
buf.append(fill)
with open("/dev/tty", "rb") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
rpgrp = buf[0]
self.assertEqual(r, 0)
self.assertIn(rpgrp, ids)
def _check_ioctl_mutate_len(self, nbytes=None):
buf = array.array('i')
intsize = buf.itemsize
ids = (os.getpgrp(), os.getsid(0))
# A fill value unlikely to be in `ids`
fill = -12345
if nbytes is not None:
# Extend the buffer so that it is exactly `nbytes` bytes long
buf.extend([fill] * (nbytes // intsize))
self.assertEqual(len(buf) * intsize, nbytes) # sanity check
else:
buf.append(fill)
with open("/dev/tty", "r") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
rpgrp = buf[0]
self.assertEqual(r, 0)
self.assertIn(rpgrp, ids)
def _check_ioctl_mutate_len(self, nbytes=None):
buf = array.array('i')
intsize = buf.itemsize
ids = (os.getpgrp(), os.getsid(0))
# A fill value unlikely to be in `ids`
fill = -12345
if nbytes is not None:
# Extend the buffer so that it is exactly `nbytes` bytes long
buf.extend([fill] * (nbytes // intsize))
self.assertEqual(len(buf) * intsize, nbytes) # sanity check
else:
buf.append(fill)
with open("/dev/tty", "r") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
rpgrp = buf[0]
self.assertEqual(r, 0)
self.assertIn(rpgrp, ids)
def getDebug(self):
return {
"environment": self.req.env,
"client": self.req.client.__dict__,
"database": self.db.get_debug(),
"system": {
"uname": os.uname()
},
"process": {
"cwd": os.getcwdu(),
"pid": os.getpid(),
"ppid": os.getppid(),
"pgrp": os.getpgrp(),
"uid": os.getuid(),
"gid": os.getgid(),
"euid": os.geteuid(),
"egid": os.getegid(),
"groups": os.getgroups()
}
}
def _check_ioctl_mutate_len(self, nbytes=None):
buf = array.array('i')
intsize = buf.itemsize
ids = (os.getpgrp(), os.getsid(0))
# A fill value unlikely to be in `ids`
fill = -12345
if nbytes is not None:
# Extend the buffer so that it is exactly `nbytes` bytes long
buf.extend([fill] * (nbytes // intsize))
self.assertEqual(len(buf) * intsize, nbytes) # sanity check
else:
buf.append(fill)
with open("/dev/tty", "rb") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
rpgrp = buf[0]
self.assertEqual(r, 0)
self.assertIn(rpgrp, ids)
def _check_ioctl_mutate_len(self, nbytes=None):
buf = array.array('i')
intsize = buf.itemsize
ids = (os.getpgrp(), os.getsid(0))
# A fill value unlikely to be in `ids`
fill = -12345
if nbytes is not None:
# Extend the buffer so that it is exactly `nbytes` bytes long
buf.extend([fill] * (nbytes // intsize))
self.assertEqual(len(buf) * intsize, nbytes) # sanity check
else:
buf.append(fill)
with open("/dev/tty", "r") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
rpgrp = buf[0]
self.assertEqual(r, 0)
self.assertIn(rpgrp, ids)
def _check_ioctl_mutate_len(self, nbytes=None):
buf = array.array('i')
intsize = buf.itemsize
ids = (os.getpgrp(), os.getsid(0))
# A fill value unlikely to be in `ids`
fill = -12345
if nbytes is not None:
# Extend the buffer so that it is exactly `nbytes` bytes long
buf.extend([fill] * (nbytes // intsize))
self.assertEqual(len(buf) * intsize, nbytes) # sanity check
else:
buf.append(fill)
with open("/dev/tty", "rb") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
rpgrp = buf[0]
self.assertEqual(r, 0)
self.assertIn(rpgrp, ids)
def _check_ioctl_mutate_len(self, nbytes=None):
buf = array.array('i')
intsize = buf.itemsize
ids = (os.getpgrp(), os.getsid(0))
# A fill value unlikely to be in `ids`
fill = -12345
if nbytes is not None:
# Extend the buffer so that it is exactly `nbytes` bytes long
buf.extend([fill] * (nbytes // intsize))
self.assertEqual(len(buf) * intsize, nbytes) # sanity check
else:
buf.append(fill)
with open("/dev/tty", "r") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
rpgrp = buf[0]
self.assertEqual(r, 0)
self.assertIn(rpgrp, ids)
def execute(self,dt):
if self.finished: return "finished"
if not self.running:
self.process = Process(target = executeInProcessGroup, args = (self,))
self.process.start()
print "timeshare child PID:",self.process.pid
os.setpgid(self.process.pid,self.process.pid)
print "timeshare process group",os.getpgid(self.process.pid)
assert os.getpgid(self.process.pid) == self.process.pid
print "my process group",os.getpgrp(),"which should be",os.getpgid(0)
assert os.getpgid(self.process.pid) != os.getpgid(0)
self.running = True
else:
os.killpg(self.process.pid, signal.SIGCONT)
self.process.join(dt)
if self.process.is_alive():
os.killpg(self.process.pid, signal.SIGSTOP)
return "still running"
else:
self.finished = True
return self.q.get()
def _check_ioctl_mutate_len(self, nbytes=None):
buf = array.array('i')
intsize = buf.itemsize
ids = (os.getpgrp(), os.getsid(0))
# A fill value unlikely to be in `ids`
fill = -12345
if nbytes is not None:
# Extend the buffer so that it is exactly `nbytes` bytes long
buf.extend([fill] * (nbytes // intsize))
self.assertEqual(len(buf) * intsize, nbytes) # sanity check
else:
buf.append(fill)
with open("/dev/tty", "rb") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
rpgrp = buf[0]
self.assertEqual(r, 0)
self.assertIn(rpgrp, ids)
def show_setting_prgrp():
print('Calling os.setpgrp() from {}'.format(os.getpid()))
os.setpgrp()
print('Process group is now {}'.format(
os.getpid(), os.getpgrp()))
sys.stdout.flush()
def test_ioctl(self):
# If this process has been put into the background, TIOCGPGRP returns
# the session ID instead of the process group id.
ids = (os.getpgrp(), os.getsid(0))
with open("/dev/tty", "rb") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
self.assertIn(rpgrp, ids)
def main(args):
if os.getpgrp() != os.getpid():
os.setsid()
return run_task_handler(ReleaseUpgrader, args)
def main(args):
if os.getpgrp() != os.getpid():
os.setsid()
return run_task_handler(PackageChanger, args)
def test_ioctl(self):
# If this process has been put into the background, TIOCGPGRP returns
# the session ID instead of the process group id.
ids = (os.getpgrp(), os.getsid(0))
tty = open("/dev/tty", "r")
r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
self.assertIn(rpgrp, ids)
def test_ioctl(self):
# If this process has been put into the background, TIOCGPGRP returns
# the session ID instead of the process group id.
ids = (os.getpgrp(), os.getsid(0))
tty = open("/dev/tty", "r")
r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
self.assertIn(rpgrp, ids)
def setup_handle_sig(subprocess=False):
if os.getpid() != os.getpgrp():
os.setpgrp()
sig_handler = main_handle_sig if not subprocess else sub_handle_sig
for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGHUP]:
signal.signal(sig, sig_handler)
signal.signal(signal.SIGUSR1, handle_sig_usr1)
def test_ioctl(self):
# If this process has been put into the background, TIOCGPGRP returns
# the session ID instead of the process group id.
ids = (os.getpgrp(), os.getsid(0))
with open("/dev/tty", "rb") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
self.assertIn(rpgrp, ids)
def term(sig_num, addtion):
try:
#print 'current pid is %s, group id is %s' % (os.getpid(), os.getpgrp())
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
except:
pass
def test_ioctl(self):
# If this process has been put into the background, TIOCGPGRP returns
# the session ID instead of the process group id.
ids = (os.getpgrp(), os.getsid(0))
tty = open("/dev/tty", "r")
r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
self.assertIn(rpgrp, ids)
def test_ioctl(self):
# If this process has been put into the background, TIOCGPGRP returns
# the session ID instead of the process group id.
ids = (os.getpgrp(), os.getsid(0))
with open("/dev/tty", "rb") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
self.assertIn(rpgrp, ids)
def test_ioctl(self):
# If this process has been put into the background, TIOCGPGRP returns
# the session ID instead of the process group id.
ids = (os.getpgrp(), os.getsid(0))
tty = open("/dev/tty", "r")
r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
self.assertIn(rpgrp, ids)
def _GetPidForLock():
"""Returns the PID used for host_forwarder initialization.
The PID of the "sharder" is used to handle multiprocessing. The "sharder"
is the initial process that forks that is the parent process.
"""
return os.getpgrp()
def test_ioctl(self):
# If this process has been put into the background, TIOCGPGRP returns
# the session ID instead of the process group id.
ids = (os.getpgrp(), os.getsid(0))
with open("/dev/tty", "rb") as tty:
r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
self.assertIn(rpgrp, ids)
def test_pgid():
with ProcessGroup() as pg:
pg.add(Process(runpy("input()")))
pg.add(Process(runpy("input()")))
pg.add(Process(runpy("input()")))
pg.start()
assert pg.pgid is not None
assert all(pg.pgid == p.pgid for p in pg)
assert pg.pgid != os.getpgrp()
pg.kill()
def terminateProcess(
pid, done, *, term_after=0.0, quit_after=5.0, kill_after=10.0):
"""Terminate the given process.
A "sensible" way to terminate a process. Does the following:
1. Sends SIGTERM to the process identified by `pid`.
2. Waits for up to 5 seconds.
3. Sends SIGQUIT to the process *group* of process `pid`.
4. Waits for up to an additional 5 seconds.
5. Sends SIGKILL to the process *group* of process `pid`.
Steps #3 and #5 have a safeguard: if the process identified by `pid` has
the same process group as the invoking process the signal is sent only to
the process and not to the process group. This prevents the caller from
inadvertently killing itself. For best effect, ensure that new processes
become process group leaders soon after spawning.
:param pid: The PID to terminate.
:param done: A `Deferred` that fires when the process exits.
"""
ppgid = os.getpgrp()
def kill(sig):
"""Attempt to send `signal` to the given `pid`."""
try:
_os_kill(pid, sig)
except ProcessLookupError:
pass # Already exited.
def killpg(sig):
"""Attempt to send `signal` to the progress group of `pid`.
If `pid` is running in the same process group as the invoking process,
this falls back to using kill(2) instead of killpg(2).
"""
try:
pgid = os.getpgid(pid)
if pgid == ppgid:
_os_kill(pid, sig)
else:
_os_killpg(pgid, sig)
except ProcessLookupError:
pass # Already exited.
killers = (
reactor.callLater(term_after, kill, signal.SIGTERM),
reactor.callLater(quit_after, killpg, signal.SIGQUIT),
reactor.callLater(kill_after, killpg, signal.SIGKILL),
)
def ended():
for killer in killers:
if killer.active():
killer.cancel()
done.addBoth(callOut, ended)
def show_progress(self, done=False):
"""Display progress bar if enabled and if running on correct terminal"""
if SHOWPROG and self.showprog and (done or self.index % 500 == 0) \
and (os.getpgrp() == os.tcgetpgrp(sys.stderr.fileno())):
rows, columns = struct.unpack('hh', fcntl.ioctl(2, termios.TIOCGWINSZ, "1234"))
if columns < 100:
sps = 30
else:
# Terminal is wide enough, include bytes/sec
sps = 42
# Progress bar length
wlen = int(columns) - len(str_units(self.filesize)) - sps
# Number of bytes per progress bar unit
xunit = float(self.filesize)/wlen
# Progress bar units done so far
xdone = int(self.offset/xunit)
xtime = time.time()
progress = 100.0*self.offset/self.filesize
# Display progress only if there is some change in progress
if (done and not self.progdone) or (self.prevdone != xdone or \
int(self.prevtime) != int(xtime) or \
round(self.prevprog) != round(progress)):
if done:
# Do not display progress again when done=True
self.progdone = 1
otime = xtime - self.timestart # Overall time
tdelta = xtime - self.prevtime # Segment time
self.prevprog = progress
self.prevdone = xdone
self.prevtime = xtime
# Number of progress bar units for completion
slen = wlen - xdone
if done:
# Overall average bytes/sec
bps = self.offset / otime
else:
# Segment average bytes/sec
bps = (self.offset - self.prevoff) / tdelta
self.prevoff = self.offset
# Progress bar has both foreground and background colors
# as green and in case the terminal does not support colors
# then a "=" is displayed instead instead of a green block
pbar = " [\033[32m\033[42m%s\033[m%s] " % ("="*xdone, " "*slen)
# Add progress percentage and how many bytes have been
# processed so far relative to the total number of bytes
pbar += "%5.1f%% %9s/%s" % (progress, str_units(self.offset), str_units(self.filesize))
if columns < 100:
sys.stderr.write("%s %-6s\r" % (pbar, str_time(otime)))
else:
# Terminal is wide enough, include bytes/sec
sys.stderr.write("%s %9s/s %-6s\r" % (pbar, str_units(bps), str_time(otime)))
if done:
sys.stderr.write("\n")