我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用psutil.ZombieProcess()。
def pids_active(pids_computer): """ This function find pids of computer and return the valid. """ pid_valid = {} for pid in pids_computer: data = None try: process = psutil.Process(pid) data = {"pid": process.pid, "status": process.status(), "percent_cpu_used": process.cpu_percent(interval=0.0), "percent_memory_used": process.memory_percent()} except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess): data = None if data is not None: pid_valid[process.name()] = data return pid_valid
def test_exe_mocked(self): with mock.patch('psutil._pslinux.os.readlink', side_effect=OSError(errno.ENOENT, "")) as m: # No such file error; might be raised also if /proc/pid/exe # path actually exists for system processes with low pids # (about 0-20). In this case psutil is supposed to return # an empty string. ret = psutil.Process().exe() assert m.called self.assertEqual(ret, "") # ...but if /proc/pid no longer exist we're supposed to treat # it as an alias for zombie process with mock.patch('psutil._pslinux.os.path.lexists', return_value=False): self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
def test_zombie_process__repr__(self, func=repr): self.assertEqual( repr(psutil.ZombieProcess(321)), "psutil.ZombieProcess process still exists but it's a zombie " "(pid=321)") self.assertEqual( repr(psutil.ZombieProcess(321, name='foo')), "psutil.ZombieProcess process still exists but it's a zombie " "(pid=321, name='foo')") self.assertEqual( repr(psutil.ZombieProcess(321, name='foo', ppid=1)), "psutil.ZombieProcess process still exists but it's a zombie " "(pid=321, name='foo', ppid=1)") self.assertEqual( repr(psutil.ZombieProcess(321, msg='foo')), "psutil.ZombieProcess foo")
def test_ad_on_process_creation(self): # We are supposed to be able to instantiate Process also in case # of zombie processes or access denied. with mock.patch.object(psutil.Process, 'create_time', side_effect=psutil.AccessDenied) as meth: psutil.Process() assert meth.called with mock.patch.object(psutil.Process, 'create_time', side_effect=psutil.ZombieProcess(1)) as meth: psutil.Process() assert meth.called with mock.patch.object(psutil.Process, 'create_time', side_effect=ValueError) as meth: with self.assertRaises(ValueError): psutil.Process() assert meth.called
def update(self): """ Update the list of BrewPi processes by receiving them from the system with psutil. Returns: list of BrewPiProcess objects """ bpList = [] matching = [] # some OS's (OS X) do not allow processes to read info from other processes. try: matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'brewpi.py'in s for s in p.cmdline())] except psutil.AccessDenied: pass except psutil.ZombieProcess: pass for p in matching: bp = self.parseProcess(p) if bp: bpList.append(bp) self.list = bpList return self.list
def get_chrome_procs(self): def is_chrome(proc): try: return proc.name() == 'chrome' except psutil.ZombieProcess: return False return [p for p in psutil.process_iter() if is_chrome(p)]
def test_process__repr__(self, func=repr): p = psutil.Process() r = func(p) self.assertIn("psutil.Process", r) self.assertIn("pid=%s" % p.pid, r) self.assertIn("name=", r) self.assertIn(p.name(), r) with mock.patch.object(psutil.Process, "name", side_effect=psutil.ZombieProcess(os.getpid())): p = psutil.Process() r = func(p) self.assertIn("pid=%s" % p.pid, r) self.assertIn("zombie", r) self.assertNotIn("name=", r) with mock.patch.object(psutil.Process, "name", side_effect=psutil.NoSuchProcess(os.getpid())): p = psutil.Process() r = func(p) self.assertIn("pid=%s" % p.pid, r) self.assertIn("terminated", r) self.assertNotIn("name=", r) with mock.patch.object(psutil.Process, "name", side_effect=psutil.AccessDenied(os.getpid())): p = psutil.Process() r = func(p) self.assertIn("pid=%s" % p.pid, r) self.assertNotIn("name=", r)
def findConflicts(self, process): """ Finds out if the process given as argument will conflict with other running instances of BrewPi Always returns a conflict if a firmware update is running Params: process: a BrewPiProcess object that will be compared with other running instances Returns: bool: True means there are conflicts, False means no conflict """ # some OS's (OS X) do not allow processes to read info from other processes. matching = [] try: matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'flashDfu.py'in s for s in p.cmdline())] except psutil.AccessDenied: pass except psutil.ZombieProcess: pass if len(matching) > 0: return 1 try: matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'updateFirmware.py'in s for s in p.cmdline())] except psutil.AccessDenied: pass except psutil.ZombieProcess: pass if len(matching) > 0: return 1 for p in self.list: if process.pid == p.pid: # skip the process itself continue elif process.conflict(p): return 1 return 0
def print_stats(self, verbose=False): """ Prints out stats for each matched process and a sum of the RSS of the parent process and the USS of its children. :param verbose: Set true to see the full command-line. This is useful when deciding on a parent filter. """ def wrapped_path_filter(x): try: return self.path_filter(x.exe()) except (psutil.AccessDenied, psutil.ZombieProcess): return False parent_rss = 0 children_uss = 0 for p in filter(wrapped_path_filter, psutil.process_iter()): info = p.memory_full_info() rss = info.rss uss = info.uss cmdline = self.get_cmdline(p) exe = cmdline if verbose else p.exe() if self.parent_filter(cmdline): print "[%d] - %s\n * RSS - %d\n USS - %d" % (p.pid, exe, rss, uss) parent_rss += rss else: print "[%d] - %s\n RSS - %d\n * USS - %d" % (p.pid, exe, rss, uss) children_uss += uss if not parent_rss: if not children_uss: raise ProcessNotFoundException( "No processes matched the path filter") else: raise ProcessNotFoundException( "No process matched the parent filter") print "\nTotal: {:,} bytes\n".format(parent_rss + children_uss)
def get_process_from_pid(pid): process = None if is_int_str(pid): try: process = psutil.Process(pid=int(pid)) except (psutil.NoSuchProcess, psutil.ZombieProcess): log.debug('PROCESS HAS BEEN TERMINATED {}'.format(int(pid))) pass return process
def get_is_process_running(pid): process = None if is_int_str(pid): try: process = psutil.Process(pid=int(pid)) exitCode = process.wait(0) log.debug('waited for process.. exitCode: {}'.format(exitCode)) except (psutil.NoSuchProcess, psutil.ZombieProcess, psutil.TimeoutExpired): pass return process
def main(): # construct a dict where 'values' are all the processes # having 'key' as their parent tree = collections.defaultdict(list) for p in psutil.process_iter(): try: tree[p.ppid()].append(p.pid) except (psutil.NoSuchProcess, psutil.ZombieProcess): pass # on systems supporting PID 0, PID 0's parent is usually 0 if 0 in tree and 0 in tree[0]: tree[0].remove(0) print_tree(min(tree), tree)
def info(ctx, message): """ Displays Rero information :param ctx: :param message: :return: """ # TODO: this command needs to be fixed later servs = len(ctx.servers) ch = list(ctx.get_all_channels()) channels = len(ch) mem = list(ctx.get_all_members()) members = len(mem) current = datetime.datetime.now() diff = current - ctx.startup_timestamp days = diff.days seconds = diff.seconds m, s = divmod(seconds, 60) h, m = divmod(m, 60) # total_c = ctx.usage_track_admin['admin'] + ctx.usage_track['user'] # total_m = ctx.usage_track_admin['total_msg'] # r_min = (days * 24 * 60) + (h * 60) + m # msg_rate = float(total_c / r_min) # total_msg_rate = float(total_m / r_min) try: proc = psutil.Process(pid=os.getpid()) rss = float(proc.memory_info().rss) / 1000000 rss_per = float(proc.memory_percent()) cpu_per = float(proc.cpu_percent(interval=0.2)) sys_str = "RAM: {0:.2f} MB | CPU Usage: {1:.2f}% (*calculated for `2 ms` interval*)" \ .format(rss, rss_per, cpu_per) except (psutil.NoSuchProcess, psutil.ZombieProcess, psutil.AccessDenied): sys_str = "" # ret_str = "`Connected to '{0}' Servers with '{1}' Channels and '{2}' Members`" \ # "\nCurrent uptime is {3} Days, {4} Hours, {5} Minutes, {6} Seconds" \ # "\n**{7}** commands were __used__ till now (**~{8:.2f}** per min)" \ # "\n**{9}** messages were __sent__ till now (**~{10:.2f}** per min)" \ # "\n**{11}** messages were __seen__ till now (**~{12:.2f}** per min)" \ # "\n{13}" \ # .format(str(servs), str(channels), str(members), str(days), str(h), # str(m), str(s), str(total_c), msg_rate, ctx.rero_sent_message, # float(ctx.rero_sent_message / r_min), str(total_m), total_msg_rate, sys_str) ret_str = "`Connected to '{0}' Servers with '{1}' Channels and '{2}' Members`" \ "\nCurrent uptime is {3} Days, {4} Hours, {5} Minutes, {6} Seconds" \ "\n{7}\n**SHARD: {8} / 2**" \ .format(str(servs), str(channels), str(members), str(days), str(h), str(m), str(s), sys_str, ctx.current_shard) await ctx.send_message(message.channel, ret_str)
def info(ctx, message): """ Displays Lapzbot information :param ctx: :param message: :return: """ # TODO: this command needs to be fixed later servs = len(ctx.servers) ch = list(ctx.get_all_channels()) channels = len(ch) mem = list(ctx.get_all_members()) members = len(mem) current = datetime.datetime.now() diff = current - ctx.startup_timestamp days = diff.days seconds = diff.seconds m, s = divmod(seconds, 60) h, m = divmod(m, 60) # total_c = ctx.usage_track_admin['admin'] + ctx.usage_track['user'] # total_m = ctx.usage_track_admin['total_msg'] # r_min = (days * 24 * 60) + (h * 60) + m # msg_rate = float(total_c / r_min) # total_msg_rate = float(total_m / r_min) try: proc = psutil.Process(pid=os.getpid()) rss = float(proc.memory_info().rss) / 1000000 rss_per = float(proc.memory_percent()) cpu_per = float(proc.cpu_percent(interval=0.2)) sys_str = "RAM: {0:.2f} MB | CPU Usage: {1:.2f}% (*calculated for `2 ms` interval*)" \ .format(rss, rss_per, cpu_per) except (psutil.NoSuchProcess, psutil.ZombieProcess, psutil.AccessDenied): sys_str = "" # ret_str = "`Connected to '{0}' Servers with '{1}' Channels and '{2}' Members`" \ # "\nCurrent uptime is {3} Days, {4} Hours, {5} Minutes, {6} Seconds" \ # "\n**{7}** commands were __used__ till now (**~{8:.2f}** per min)" \ # "\n**{9}** messages were __sent__ till now (**~{10:.2f}** per min)" \ # "\n**{11}** messages were __seen__ till now (**~{12:.2f}** per min)" \ # "\n{13}" \ # .format(str(servs), str(channels), str(members), str(days), str(h), # str(m), str(s), str(total_c), msg_rate, ctx.lapzbot_sent_message, # float(ctx.lapzbot_sent_message / r_min), str(total_m), total_msg_rate, sys_str) ret_str = "`Connected to '{0}' Servers with '{1}' Channels and '{2}' Members`" \ "\nCurrent uptime is {3} Days, {4} Hours, {5} Minutes, {6} Seconds" \ "\n{7}\n**SHARD: {8} / 2**" \ .format(str(servs), str(channels), str(members), str(days), str(h), str(m), str(s), sys_str, ctx.current_shard) await ctx.send_message(message.channel, ret_str)
def test_as_dict(self): p = psutil.Process() d = p.as_dict(attrs=['exe', 'name']) self.assertEqual(sorted(d.keys()), ['exe', 'name']) p = psutil.Process(min(psutil.pids())) d = p.as_dict(attrs=['connections'], ad_value='foo') if not isinstance(d['connections'], list): self.assertEqual(d['connections'], 'foo') # Test ad_value is set on AccessDenied. with mock.patch('psutil.Process.nice', create=True, side_effect=psutil.AccessDenied): self.assertEqual( p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1}) # Test that NoSuchProcess bubbles up. with mock.patch('psutil.Process.nice', create=True, side_effect=psutil.NoSuchProcess(p.pid, "name")): self.assertRaises( psutil.NoSuchProcess, p.as_dict, attrs=["nice"]) # Test that ZombieProcess is swallowed. with mock.patch('psutil.Process.nice', create=True, side_effect=psutil.ZombieProcess(p.pid, "name")): self.assertEqual( p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"}) # By default APIs raising NotImplementedError are # supposed to be skipped. with mock.patch('psutil.Process.nice', create=True, side_effect=NotImplementedError): d = p.as_dict() self.assertNotIn('nice', list(d.keys())) # ...unless the user explicitly asked for some attr. with self.assertRaises(NotImplementedError): p.as_dict(attrs=["nice"]) # errors with self.assertRaises(TypeError): p.as_dict('name') with self.assertRaises(ValueError): p.as_dict(['foo']) with self.assertRaises(ValueError): p.as_dict(['foo', 'bar'])
def poll(interval): """Calculate IO usage by comparing IO statics before and after the interval. Return a tuple including all currently running processes sorted by IO activity and total disks I/O activity. """ # first get a list of all processes and disk io counters procs = [p for p in psutil.process_iter()] for p in procs[:]: try: p._before = p.io_counters() except psutil.Error: procs.remove(p) continue disks_before = psutil.disk_io_counters() # sleep some time time.sleep(interval) # then retrieve the same info again for p in procs[:]: try: p._after = p.io_counters() p._cmdline = ' '.join(p.cmdline()) if not p._cmdline: p._cmdline = p.name() p._username = p.username() except (psutil.NoSuchProcess, psutil.ZombieProcess): procs.remove(p) disks_after = psutil.disk_io_counters() # finally calculate results by comparing data before and # after the interval for p in procs: p._read_per_sec = p._after.read_bytes - p._before.read_bytes p._write_per_sec = p._after.write_bytes - p._before.write_bytes p._total = p._read_per_sec + p._write_per_sec disks_read_per_sec = disks_after.read_bytes - disks_before.read_bytes disks_write_per_sec = disks_after.write_bytes - disks_before.write_bytes # sort processes by total disk IO so that the more intensive # ones get listed first processes = sorted(procs, key=lambda p: p._total, reverse=True) return (processes, disks_read_per_sec, disks_write_per_sec)