我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.POSIX。
def get_default_session(): """ Locates the first ancestor process which is a shell. Returns its pid, or None if not found. """ if psutil.POSIX: def predicate(name): return name.endswith("sh") elif psutil.WINDOWS: def predicate(name): return name in ("cmd.exe", "powershell.exe") else: return None proc = psutil.Process() while proc.parent().pid: proc = proc.parent() if predicate(proc.name()): return proc.pid return None
def test_wait_timeout_0(self): sproc = get_test_subprocess() p = psutil.Process(sproc.pid) self.assertRaises(psutil.TimeoutExpired, p.wait, 0) p.kill() stop_at = time.time() + 2 while True: try: code = p.wait(0) except psutil.TimeoutExpired: if time.time() >= stop_at: raise else: break if POSIX: self.assertEqual(code, signal.SIGKILL) else: self.assertEqual(code, 0) self.assertFalse(p.is_running())
def test_username(self): sproc = get_test_subprocess() p = psutil.Process(sproc.pid) if POSIX: import pwd self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name) with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun: p.username() == str(p.uids().real) assert fun.called elif WINDOWS and 'USERNAME' in os.environ: expected_username = os.environ['USERNAME'] expected_domain = os.environ['USERDOMAIN'] domain, username = p.username().split('\\') self.assertEqual(domain, expected_domain) self.assertEqual(username, expected_username) else: p.username()
def test_net_if_addrs_mac_null_bytes(self): # Simulate that the underlying C function returns an incomplete # MAC address. psutil is supposed to fill it with null bytes. # https://github.com/giampaolo/psutil/issues/786 if POSIX: ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)] else: ret = [('em1', -1, '06-3d-29', None, None, None)] with mock.patch('psutil._psplatform.net_if_addrs', return_value=ret) as m: addr = psutil.net_if_addrs()['em1'][0] assert m.called if POSIX: self.assertEqual(addr.address, '06:3d:29:00:00:00') else: self.assertEqual(addr.address, '06-3d-29-00-00-00')
def test_wait_timeout_0(self): sproc = get_test_subprocess() p = psutil.Process(sproc.pid) self.assertRaises(psutil.TimeoutExpired, p.wait, 0) p.kill() stop_at = time.time() + 2 while True: try: code = p.wait(0) except psutil.TimeoutExpired: if time.time() >= stop_at: raise else: break if POSIX: self.assertEqual(code, -signal.SIGKILL) else: self.assertEqual(code, 0) self.assertFalse(p.is_running())
def test_kill(self): sproc = get_test_subprocess() test_pid = sproc.pid p = psutil.Process(test_pid) p.kill() sig = p.wait() self.assertFalse(psutil.pid_exists(test_pid)) if POSIX: self.assertEqual(sig, signal.SIGKILL)
def test_terminate(self): sproc = get_test_subprocess() test_pid = sproc.pid p = psutil.Process(test_pid) p.terminate() sig = p.wait() self.assertFalse(psutil.pid_exists(test_pid)) if POSIX: self.assertEqual(sig, signal.SIGTERM)
def test_send_signal(self): sig = signal.SIGKILL if POSIX else signal.SIGTERM sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) exit_sig = p.wait() self.assertFalse(psutil.pid_exists(p.pid)) if POSIX: self.assertEqual(exit_sig, sig) # sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.ESRCH, "")): with self.assertRaises(psutil.NoSuchProcess): p.send_signal(sig) # sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.EPERM, "")): with self.assertRaises(psutil.AccessDenied): psutil.Process().send_signal(sig) # Sending a signal to process with PID 0 is not allowed as # it would affect every process in the process group of # the calling process (os.getpid()) instead of PID 0"). if 0 in psutil.pids(): p = psutil.Process(0) self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
def test_memory_maps(self): p = psutil.Process() maps = p.memory_maps() paths = [x for x in maps] self.assertEqual(len(paths), len(set(paths))) ext_maps = p.memory_maps(grouped=False) for nt in maps: if not nt.path.startswith('['): assert os.path.isabs(nt.path), nt.path if POSIX: try: assert os.path.exists(nt.path) or \ os.path.islink(nt.path), nt.path except AssertionError: if not LINUX: raise else: # https://github.com/giampaolo/psutil/issues/759 with open('/proc/self/smaps') as f: data = f.read() if "%s (deleted)" % nt.path not in data: raise else: # XXX - On Windows we have this strange behavior with # 64 bit dlls: they are visible via explorer but cannot # be accessed via os.stat() (wtf?). if '64' not in os.path.basename(nt.path): assert os.path.exists(nt.path), nt.path for nt in ext_maps: for fname in nt._fields: value = getattr(nt, fname) if fname == 'path': continue elif fname in ('addr', 'perms'): assert value, value else: self.assertIsInstance(value, (int, long)) assert value >= 0, value
def test_pid_0(self): # Process(0) is supposed to work on all platforms except Linux if 0 not in psutil.pids(): self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) return # test all methods p = psutil.Process(0) for name in psutil._as_dict_attrnames: if name == 'pid': continue meth = getattr(p, name) try: ret = meth() except psutil.AccessDenied: pass else: if name in ("uids", "gids"): self.assertEqual(ret.real, 0) elif name == "username": if POSIX: self.assertEqual(p.username(), 'root') elif WINDOWS: self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM') elif name == "name": assert name, name if hasattr(p, 'rlimit'): try: p.rlimit(psutil.RLIMIT_FSIZE) except psutil.AccessDenied: pass p.as_dict() if not OPENBSD: self.assertIn(0, psutil.pids()) self.assertTrue(psutil.pid_exists(0))
def setUp(self): if POSIX: import pwd import grp users = pwd.getpwall() groups = grp.getgrall() self.all_uids = set([x.pw_uid for x in users]) self.all_usernames = set([x.pw_name for x in users]) self.all_gids = set([x.gr_gid for x in groups])
def exe(self, ret, proc): if not ret: self.assertEqual(ret, '') else: assert os.path.isabs(ret), ret # Note: os.stat() may return False even if the file is there # hence we skip the test, see: # http://stackoverflow.com/questions/3112546/os-path-exists-lies if POSIX and os.path.isfile(ret): if hasattr(os, 'access') and hasattr(os, "X_OK"): # XXX may fail on OSX self.assertTrue(os.access(ret, os.X_OK))
def memory_info(self, ret, proc): for name in ret._fields: self.assertGreaterEqual(getattr(ret, name), 0) if POSIX and ret.vms != 0: # VMS is always supposed to be the highest for name in ret._fields: if name != 'vms': value = getattr(ret, name) assert ret.vms > value, ret elif WINDOWS: assert ret.peak_wset >= ret.wset, ret assert ret.peak_paged_pool >= ret.paged_pool, ret assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret assert ret.peak_pagefile >= ret.pagefile, ret
def nice(self, ret, proc): if POSIX: assert -20 <= ret <= 20, ret else: priorities = [getattr(psutil, x) for x in dir(psutil) if x.endswith('_PRIORITY_CLASS')] self.assertIn(ret, priorities)
def create_exe(outpath, c_code=None): assert not os.path.exists(outpath), outpath if which("gcc"): if c_code is None: c_code = textwrap.dedent( """ #include <unistd.h> int main() { pause(); return 1; } """) with tempfile.NamedTemporaryFile( suffix='.c', delete=False, mode='wt') as f: f.write(c_code) try: subprocess.check_call(["gcc", f.name, "-o", outpath]) finally: safe_rmpath(f.name) else: # fallback - use python's executable shutil.copyfile(sys.executable, outpath) if POSIX: st = os.stat(outpath) os.chmod(outpath, st.st_mode | stat.S_IEXEC) # =================================================================== # --- testing # ===================================================================
def test_kill(self): sproc = get_test_subprocess() test_pid = sproc.pid p = psutil.Process(test_pid) p.kill() sig = p.wait() self.assertFalse(psutil.pid_exists(test_pid)) if POSIX: self.assertEqual(sig, -signal.SIGKILL)
def test_terminate(self): sproc = get_test_subprocess() test_pid = sproc.pid p = psutil.Process(test_pid) p.terminate() sig = p.wait() self.assertFalse(psutil.pid_exists(test_pid)) if POSIX: self.assertEqual(sig, -signal.SIGTERM)
def test_send_signal(self): sig = signal.SIGKILL if POSIX else signal.SIGTERM sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) exit_sig = p.wait() self.assertFalse(psutil.pid_exists(p.pid)) if POSIX: self.assertEqual(exit_sig, -sig) # sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.ESRCH, "")): with self.assertRaises(psutil.NoSuchProcess): p.send_signal(sig) # sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.EPERM, "")): with self.assertRaises(psutil.AccessDenied): psutil.Process().send_signal(sig) # Sending a signal to process with PID 0 is not allowed as # it would affect every process in the process group of # the calling process (os.getpid()) instead of PID 0"). if 0 in psutil.pids(): p = psutil.Process(0) self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
def create_exe(outpath, c_code=None): """Creates an executable file in the given location.""" assert not os.path.exists(outpath), outpath if which("gcc"): if c_code is None: c_code = textwrap.dedent( """ #include <unistd.h> int main() { pause(); return 1; } """) with tempfile.NamedTemporaryFile( suffix='.c', delete=False, mode='wt') as f: f.write(c_code) try: subprocess.check_call(["gcc", f.name, "-o", outpath]) finally: safe_rmpath(f.name) else: # fallback - use python's executable if c_code is not None: raise ValueError( "can't specify c_code arg as gcc is not installed") shutil.copyfile(sys.executable, outpath) if POSIX: st = os.stat(outpath) os.chmod(outpath, st.st_mode | stat.S_IEXEC) # =================================================================== # --- testing # ===================================================================
def print_(a, b): if sys.stdout.isatty() and psutil.POSIX: fmt = '\x1b[1;32m%-13s\x1b[0m %s' % (a, b) else: fmt = '%-11s %s' % (a, b) print(fmt)
def test_wait(self): # check exit code signal sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.kill() code = p.wait() if POSIX: self.assertEqual(code, signal.SIGKILL) else: self.assertEqual(code, 0) self.assertFalse(p.is_running()) sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.terminate() code = p.wait() if POSIX: self.assertEqual(code, signal.SIGTERM) else: self.assertEqual(code, 0) self.assertFalse(p.is_running()) # check sys.exit() code code = "import time, sys; time.sleep(0.01); sys.exit(5);" sproc = get_test_subprocess([PYTHON, "-c", code]) p = psutil.Process(sproc.pid) self.assertEqual(p.wait(), 5) self.assertFalse(p.is_running()) # Test wait() issued twice. # It is not supposed to raise NSP when the process is gone. # On UNIX this should return None, on Windows it should keep # returning the exit code. sproc = get_test_subprocess([PYTHON, "-c", code]) p = psutil.Process(sproc.pid) self.assertEqual(p.wait(), 5) self.assertIn(p.wait(), (5, None)) # test timeout sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.name() self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) # timeout < 0 not allowed self.assertRaises(ValueError, p.wait, -1) # XXX why is this skipped on Windows?
def test_disk_partitions(self): # all = False ls = psutil.disk_partitions(all=False) # on travis we get: # self.assertEqual(p.cpu_affinity(), [n]) # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] self.assertTrue(ls, msg=ls) for disk in ls: if WINDOWS and 'cdrom' in disk.opts: continue if not POSIX: assert os.path.exists(disk.device), disk else: # we cannot make any assumption about this, see: # http://goo.gl/p9c43 disk.device if SUNOS: # on solaris apparently mount points can also be files assert os.path.exists(disk.mountpoint), disk else: assert os.path.isdir(disk.mountpoint), disk assert disk.fstype, disk self.assertIsInstance(disk.opts, str) # all = True ls = psutil.disk_partitions(all=True) self.assertTrue(ls, msg=ls) for disk in psutil.disk_partitions(all=True): if not WINDOWS: try: os.stat(disk.mountpoint) except OSError as err: if TRAVIS and OSX and err.errno == errno.EIO: continue # http://mail.python.org/pipermail/python-dev/ # 2012-June/120787.html if err.errno not in (errno.EPERM, errno.EACCES): raise else: if SUNOS: # on solaris apparently mount points can also be files assert os.path.exists(disk.mountpoint), disk else: assert os.path.isdir(disk.mountpoint), disk self.assertIsInstance(disk.fstype, str) self.assertIsInstance(disk.opts, str) def find_mount_point(path): path = os.path.abspath(path) while not os.path.ismount(path): path = os.path.dirname(path) return path.lower() mount = find_mount_point(__file__) mounts = [x.mountpoint.lower() for x in psutil.disk_partitions(all=True)] self.assertIn(mount, mounts) psutil.disk_usage(mount)
def test_wait(self): # check exit code signal sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.kill() code = p.wait() if POSIX: self.assertEqual(code, -signal.SIGKILL) else: self.assertEqual(code, 0) self.assertFalse(p.is_running()) sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.terminate() code = p.wait() if POSIX: self.assertEqual(code, -signal.SIGTERM) else: self.assertEqual(code, 0) self.assertFalse(p.is_running()) # check sys.exit() code code = "import time, sys; time.sleep(0.01); sys.exit(5);" sproc = get_test_subprocess([PYTHON, "-c", code]) p = psutil.Process(sproc.pid) self.assertEqual(p.wait(), 5) self.assertFalse(p.is_running()) # Test wait() issued twice. # It is not supposed to raise NSP when the process is gone. # On UNIX this should return None, on Windows it should keep # returning the exit code. sproc = get_test_subprocess([PYTHON, "-c", code]) p = psutil.Process(sproc.pid) self.assertEqual(p.wait(), 5) self.assertIn(p.wait(), (5, None)) # test timeout sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.name() self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) # timeout < 0 not allowed self.assertRaises(ValueError, p.wait, -1) # XXX why is this skipped on Windows?
def test_disk_partitions(self): # all = False ls = psutil.disk_partitions(all=False) # on travis we get: # self.assertEqual(p.cpu_affinity(), [n]) # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] self.assertTrue(ls, msg=ls) for disk in ls: self.assertIsInstance(disk.device, (str, unicode)) self.assertIsInstance(disk.mountpoint, (str, unicode)) self.assertIsInstance(disk.fstype, (str, unicode)) self.assertIsInstance(disk.opts, (str, unicode)) if WINDOWS and 'cdrom' in disk.opts: continue if not POSIX: assert os.path.exists(disk.device), disk else: # we cannot make any assumption about this, see: # http://goo.gl/p9c43 disk.device if SUNOS: # on solaris apparently mount points can also be files assert os.path.exists(disk.mountpoint), disk else: assert os.path.isdir(disk.mountpoint), disk assert disk.fstype, disk # all = True ls = psutil.disk_partitions(all=True) self.assertTrue(ls, msg=ls) for disk in psutil.disk_partitions(all=True): if not WINDOWS: try: os.stat(disk.mountpoint) except OSError as err: if TRAVIS and OSX and err.errno == errno.EIO: continue # http://mail.python.org/pipermail/python-dev/ # 2012-June/120787.html if err.errno not in (errno.EPERM, errno.EACCES): raise else: if SUNOS: # on solaris apparently mount points can also be files assert os.path.exists(disk.mountpoint), disk else: assert os.path.isdir(disk.mountpoint), disk self.assertIsInstance(disk.fstype, str) self.assertIsInstance(disk.opts, str) def find_mount_point(path): path = os.path.abspath(path) while not os.path.ismount(path): path = os.path.dirname(path) return path.lower() mount = find_mount_point(__file__) mounts = [x.mountpoint.lower() for x in psutil.disk_partitions(all=True)] self.assertIn(mount, mounts) psutil.disk_usage(mount)
def test_wait_procs(self): def callback(p): l.append(p.pid) l = [] sproc1 = get_test_subprocess() sproc2 = get_test_subprocess() sproc3 = get_test_subprocess() procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) t = time.time() gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) self.assertLess(time.time() - t, 0.5) self.assertEqual(gone, []) self.assertEqual(len(alive), 3) self.assertEqual(l, []) for p in alive: self.assertFalse(hasattr(p, 'returncode')) @retry_before_failing(30) def test(procs, callback): gone, alive = psutil.wait_procs(procs, timeout=0.03, callback=callback) self.assertEqual(len(gone), 1) self.assertEqual(len(alive), 2) return gone, alive sproc3.terminate() gone, alive = test(procs, callback) self.assertIn(sproc3.pid, [x.pid for x in gone]) if POSIX: self.assertEqual(gone.pop().returncode, signal.SIGTERM) else: self.assertEqual(gone.pop().returncode, 1) self.assertEqual(l, [sproc3.pid]) for p in alive: self.assertFalse(hasattr(p, 'returncode')) @retry_before_failing(30) def test(procs, callback): gone, alive = psutil.wait_procs(procs, timeout=0.03, callback=callback) self.assertEqual(len(gone), 3) self.assertEqual(len(alive), 0) return gone, alive sproc1.terminate() sproc2.terminate() gone, alive = test(procs, callback) self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid])) for p in gone: self.assertTrue(hasattr(p, 'returncode'))
def test_wait_procs(self): def callback(p): l.append(p.pid) l = [] sproc1 = get_test_subprocess() sproc2 = get_test_subprocess() sproc3 = get_test_subprocess() procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) t = time.time() gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) self.assertLess(time.time() - t, 0.5) self.assertEqual(gone, []) self.assertEqual(len(alive), 3) self.assertEqual(l, []) for p in alive: self.assertFalse(hasattr(p, 'returncode')) @retry_before_failing(30) def test(procs, callback): gone, alive = psutil.wait_procs(procs, timeout=0.03, callback=callback) self.assertEqual(len(gone), 1) self.assertEqual(len(alive), 2) return gone, alive sproc3.terminate() gone, alive = test(procs, callback) self.assertIn(sproc3.pid, [x.pid for x in gone]) if POSIX: self.assertEqual(gone.pop().returncode, -signal.SIGTERM) else: self.assertEqual(gone.pop().returncode, 1) self.assertEqual(l, [sproc3.pid]) for p in alive: self.assertFalse(hasattr(p, 'returncode')) @retry_before_failing(30) def test(procs, callback): gone, alive = psutil.wait_procs(procs, timeout=0.03, callback=callback) self.assertEqual(len(gone), 3) self.assertEqual(len(alive), 0) return gone, alive sproc1.terminate() sproc2.terminate() gone, alive = test(procs, callback) self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid])) for p in gone: self.assertTrue(hasattr(p, 'returncode'))
def test_wait_procs(self): def callback(p): pids.append(p.pid) pids = [] sproc1 = get_test_subprocess() sproc2 = get_test_subprocess() sproc3 = get_test_subprocess() procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) t = time.time() gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) self.assertLess(time.time() - t, 0.5) self.assertEqual(gone, []) self.assertEqual(len(alive), 3) self.assertEqual(pids, []) for p in alive: self.assertFalse(hasattr(p, 'returncode')) @retry_before_failing(30) def test(procs, callback): gone, alive = psutil.wait_procs(procs, timeout=0.03, callback=callback) self.assertEqual(len(gone), 1) self.assertEqual(len(alive), 2) return gone, alive sproc3.terminate() gone, alive = test(procs, callback) self.assertIn(sproc3.pid, [x.pid for x in gone]) if POSIX: self.assertEqual(gone.pop().returncode, -signal.SIGTERM) else: self.assertEqual(gone.pop().returncode, 1) self.assertEqual(pids, [sproc3.pid]) for p in alive: self.assertFalse(hasattr(p, 'returncode')) @retry_before_failing(30) def test(procs, callback): gone, alive = psutil.wait_procs(procs, timeout=0.03, callback=callback) self.assertEqual(len(gone), 3) self.assertEqual(len(alive), 0) return gone, alive sproc1.terminate() sproc2.terminate() gone, alive = test(procs, callback) self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid])) for p in gone: self.assertTrue(hasattr(p, 'returncode'))