我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.WINDOWS。
def get_default_session(): """ Locates the first ancestor process which is a shell. Returns its pid, or None if not found. """ if psutil.POSIX: def predicate(name): return name.endswith("sh") elif psutil.WINDOWS: def predicate(name): return name in ("cmd.exe", "powershell.exe") else: return None proc = psutil.Process() while proc.parent().pid: proc = proc.parent() if predicate(proc.name()): return proc.pid return None
def test_parse_environ_block(self): from psutil._common import parse_environ_block def k(s): return s.upper() if WINDOWS else s self.assertEqual(parse_environ_block("a=1\0"), {k("a"): "1"}) self.assertEqual(parse_environ_block("a=1\0b=2\0\0"), {k("a"): "1", k("b"): "2"}) self.assertEqual(parse_environ_block("a=1\0b=\0\0"), {k("a"): "1", k("b"): ""}) # ignore everything after \0\0 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"), {k("a"): "1", k("b"): "2"}) # ignore everything that is not an assignment self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"}) self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"}) # do not fail if the block is incomplete self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
def test_username(self): sproc = get_test_subprocess() p = psutil.Process(sproc.pid) if POSIX: import pwd self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name) with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun: p.username() == str(p.uids().real) assert fun.called elif WINDOWS and 'USERNAME' in os.environ: expected_username = os.environ['USERNAME'] expected_domain = os.environ['USERDOMAIN'] domain, username = p.username().split('\\') self.assertEqual(domain, expected_domain) self.assertEqual(username, expected_username) else: p.username()
def test_open_files_2(self): # test fd and path fields with open(TESTFN, 'w') as fileobj: p = psutil.Process() for file in p.open_files(): if file.path == fileobj.name or file.fd == fileobj.fileno(): break else: self.fail("no file found; files=%s" % repr(p.open_files())) self.assertEqual(file.path, fileobj.name) if WINDOWS: self.assertEqual(file.fd, -1) else: self.assertEqual(file.fd, fileobj.fileno()) # test positions ntuple = p.open_files()[0] self.assertEqual(ntuple[0], ntuple.path) self.assertEqual(ntuple[1], ntuple.fd) # test file is gone self.assertTrue(fileobj.name not in p.open_files())
def open_files(self, ret, proc): for f in ret: if WINDOWS: assert f.fd == -1, f else: self.assertIsInstance(f.fd, int) if LINUX: self.assertIsInstance(f.position, int) self.assertGreaterEqual(f.position, 0) self.assertIn(f.mode, ('r', 'w', 'a', 'r+', 'a+')) self.assertGreater(f.flags, 0) if BSD and not f.path: # XXX see: https://github.com/giampaolo/psutil/issues/595 continue assert os.path.isabs(f.path), f assert os.path.isfile(f.path), f
def _get_mem(): # By using USS memory it seems it's less likely to bump # into false positives. if LINUX or WINDOWS or OSX: return thisproc.memory_full_info().uss else: return thisproc.memory_info().rss
def test_ionice_set(self): if WINDOWS: value = thisproc.ionice() self.execute(self.proc.ionice, value) else: self.execute(self.proc.ionice, psutil.IOPRIO_CLASS_NONE) fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) self.execute_w_exc(OSError, fun)
def test_net_if_addrs(self): # Note: verified that on Windows this was a false positive. self.execute(psutil.net_if_addrs, tolerance_=80 * 1024 if WINDOWS else None)
def test_memory_info(self): p = psutil.Process() # step 1 - get a base value to compare our results rss1, vms1 = p.memory_info()[:2] percent1 = p.memory_percent() self.assertGreater(rss1, 0) self.assertGreater(vms1, 0) # step 2 - allocate some memory memarr = [None] * 1500000 rss2, vms2 = p.memory_info()[:2] percent2 = p.memory_percent() # step 3 - make sure that the memory usage bumped up self.assertGreater(rss2, rss1) self.assertGreaterEqual(vms2, vms1) # vms might be equal self.assertGreater(percent2, percent1) del memarr if WINDOWS: mem = p.memory_info() self.assertEqual(mem.rss, mem.wset) self.assertEqual(mem.vms, mem.pagefile) mem = p.memory_info() for name in mem._fields: self.assertGreaterEqual(getattr(mem, name), 0)
def test_memory_percent(self): p = psutil.Process() ret = p.memory_percent() assert 0 <= ret <= 100, ret ret = p.memory_percent(memtype='vms') assert 0 <= ret <= 100, ret assert 0 <= ret <= 100, ret self.assertRaises(ValueError, p.memory_percent, memtype="?!?") if LINUX or OSX or WINDOWS: ret = p.memory_percent(memtype='uss') assert 0 <= ret <= 100, ret assert 0 <= ret <= 100, ret
def test_exe(self): sproc = get_test_subprocess() exe = psutil.Process(sproc.pid).exe() try: self.assertEqual(exe, PYTHON) except AssertionError: if WINDOWS and len(exe) == len(PYTHON): # on Windows we don't care about case sensitivity normcase = os.path.normcase self.assertEqual(normcase(exe), normcase(PYTHON)) else: # certain platforms such as BSD are more accurate returning: # "/usr/local/bin/python2.7" # ...instead of: # "/usr/local/bin/python" # We do not want to consider this difference in accuracy # an error. ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) try: self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, '')) except AssertionError: # Tipically OSX. Really not sure what to do here. pass subp = subprocess.Popen([exe, '-c', 'import os; print("hey")'], stdout=subprocess.PIPE) out, _ = subp.communicate() self.assertEqual(out.strip(), b'hey')
def test_nice(self): p = psutil.Process() self.assertRaises(TypeError, p.nice, "str") if WINDOWS: try: init = p.nice() if sys.version_info > (3, 4): self.assertIsInstance(init, enum.IntEnum) else: self.assertIsInstance(init, int) self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS) p.nice(psutil.HIGH_PRIORITY_CLASS) self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS) p.nice(psutil.NORMAL_PRIORITY_CLASS) self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS) finally: p.nice(psutil.NORMAL_PRIORITY_CLASS) else: try: first_nice = p.nice() p.nice(1) self.assertEqual(p.nice(), 1) # going back to previous nice value raises # AccessDenied on OSX if not OSX: p.nice(0) self.assertEqual(p.nice(), 0) except psutil.AccessDenied: pass finally: try: p.nice(first_nice) except psutil.AccessDenied: pass
def test_pid_0(self): # Process(0) is supposed to work on all platforms except Linux if 0 not in psutil.pids(): self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) return # test all methods p = psutil.Process(0) for name in psutil._as_dict_attrnames: if name == 'pid': continue meth = getattr(p, name) try: ret = meth() except psutil.AccessDenied: pass else: if name in ("uids", "gids"): self.assertEqual(ret.real, 0) elif name == "username": if POSIX: self.assertEqual(p.username(), 'root') elif WINDOWS: self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM') elif name == "name": assert name, name if hasattr(p, 'rlimit'): try: p.rlimit(psutil.RLIMIT_FSIZE) except psutil.AccessDenied: pass p.as_dict() if not OPENBSD: self.assertIn(0, psutil.pids()) self.assertTrue(psutil.pid_exists(0))
def memory_info(self, ret, proc): for name in ret._fields: self.assertGreaterEqual(getattr(ret, name), 0) if POSIX and ret.vms != 0: # VMS is always supposed to be the highest for name in ret._fields: if name != 'vms': value = getattr(ret, name) assert ret.vms > value, ret elif WINDOWS: assert ret.peak_wset >= ret.wset, ret assert ret.peak_paged_pool >= ret.paged_pool, ret assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret assert ret.peak_pagefile >= ret.pagefile, ret
def num_handles(self, ret, proc): if WINDOWS: self.assertGreaterEqual(ret, 0) else: self.assertGreaterEqual(ret, 0)
def test_proc_name(self): subp = get_test_subprocess(cmd=[self.uexe]) if WINDOWS: # XXX: why is this like this? from psutil._pswindows import py2_strencode name = py2_strencode(psutil._psplatform.cext.proc_name(subp.pid)) else: name = psutil.Process(subp.pid).name() if not OSX and TRAVIS: self.assertEqual(name, os.path.basename(self.uexe))
def test_cpu_times(self): # Check type, value >= 0, str(). total = 0 times = psutil.cpu_times() sum(times) for cp_time in times: self.assertIsInstance(cp_time, float) self.assertGreaterEqual(cp_time, 0.0) total += cp_time self.assertEqual(total, sum(times)) str(times) # CPU times are always supposed to increase over time # or at least remain the same and that's because time # cannot go backwards. # Surprisingly sometimes this might not be the case (at # least on Windows and Linux), see: # https://github.com/giampaolo/psutil/issues/392 # https://github.com/giampaolo/psutil/issues/645 # if not WINDOWS: # last = psutil.cpu_times() # for x in range(100): # new = psutil.cpu_times() # for field in new._fields: # new_t = getattr(new, field) # last_t = getattr(last, field) # self.assertGreaterEqual(new_t, last_t, # msg="%s %s" % (new_t, last_t)) # last = new
def test_os_constants(self): names = ["POSIX", "WINDOWS", "LINUX", "OSX", "FREEBSD", "OPENBSD", "NETBSD", "BSD", "SUNOS"] for name in names: self.assertIsInstance(getattr(psutil, name), bool, msg=name) if os.name == 'posix': assert psutil.POSIX assert not psutil.WINDOWS names.remove("POSIX") if "linux" in sys.platform.lower(): assert psutil.LINUX names.remove("LINUX") elif "bsd" in sys.platform.lower(): assert psutil.BSD self.assertEqual([psutil.FREEBSD, psutil.OPENBSD, psutil.NETBSD].count(True), 1) names.remove("BSD") names.remove("FREEBSD") names.remove("OPENBSD") names.remove("NETBSD") elif "sunos" in sys.platform.lower() or \ "solaris" in sys.platform.lower(): assert psutil.SUNOS names.remove("SUNOS") elif "darwin" in sys.platform.lower(): assert psutil.OSX names.remove("OSX") else: assert psutil.WINDOWS assert not psutil.POSIX names.remove("WINDOWS") # assert all other constants are set to False for name in names: self.assertIs(getattr(psutil, name), False, msg=name)
def wait_for_pid(pid): """Wait for pid to show up in the process list then return. Used in the test suite to give time the sub process to initialize. """ psutil.Process(pid) if WINDOWS: # give it some more time to allow better initialization time.sleep(0.01)
def cwd(self, ret, proc): if ret is not None: # BSD may return None assert os.path.isabs(ret), ret try: st = os.stat(ret) except OSError as err: if WINDOWS and err.errno in \ psutil._psplatform.ACCESS_DENIED_SET: pass # directory has been removed in mean time elif err.errno != errno.ENOENT: raise else: self.assertTrue(stat.S_ISDIR(st.st_mode))
def test_proc_environ(self): env = os.environ.copy() env['FUNNY_ARG'] = self.uexe sproc = get_test_subprocess(env=env) p = psutil.Process(sproc.pid) if WINDOWS and not PY3: uexe = self.uexe.decode(sys.getfilesystemencoding()) else: uexe = self.uexe if not OSX and TRAVIS: self.assertEqual(p.environ()['FUNNY_ARG'], uexe) else: p.environ()
def test_nice(self): p = psutil.Process() self.assertRaises(TypeError, p.nice, "str") if WINDOWS: try: init = p.nice() if sys.version_info > (3, 4): self.assertIsInstance(init, enum.IntEnum) else: self.assertIsInstance(init, int) self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS) p.nice(psutil.HIGH_PRIORITY_CLASS) self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS) p.nice(psutil.NORMAL_PRIORITY_CLASS) self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS) finally: p.nice(psutil.NORMAL_PRIORITY_CLASS) else: first_nice = p.nice() try: if hasattr(os, "getpriority"): self.assertEqual( os.getpriority(os.PRIO_PROCESS, os.getpid()), p.nice()) p.nice(1) self.assertEqual(p.nice(), 1) if hasattr(os, "getpriority"): self.assertEqual( os.getpriority(os.PRIO_PROCESS, os.getpid()), p.nice()) # XXX - going back to previous nice value raises # AccessDenied on OSX if not OSX: p.nice(0) self.assertEqual(p.nice(), 0) except psutil.AccessDenied: pass finally: try: p.nice(first_nice) except psutil.AccessDenied: pass