我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.pids()。
def pids_active(pids_computer): """ This function find pids of computer and return the valid. """ pid_valid = {} for pid in pids_computer: data = None try: process = psutil.Process(pid) data = {"pid": process.pid, "status": process.status(), "percent_cpu_used": process.cpu_percent(interval=0.0), "percent_memory_used": process.memory_percent()} except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess): data = None if data is not None: pid_valid[process.name()] = data return pid_valid
def test_procfs_path(self): tdir = tempfile.mkdtemp() try: psutil.PROCFS_PATH = tdir self.assertRaises(IOError, psutil.virtual_memory) self.assertRaises(IOError, psutil.cpu_times) self.assertRaises(IOError, psutil.cpu_times, percpu=True) self.assertRaises(IOError, psutil.boot_time) # self.assertRaises(IOError, psutil.pids) self.assertRaises(IOError, psutil.net_connections) self.assertRaises(IOError, psutil.net_io_counters) self.assertRaises(IOError, psutil.net_if_stats) self.assertRaises(IOError, psutil.disk_io_counters) self.assertRaises(IOError, psutil.disk_partitions) self.assertRaises(psutil.NoSuchProcess, psutil.Process) finally: psutil.PROCFS_PATH = "/proc" os.rmdir(tdir)
def getSidToken(token_sid): # trying to get system privileges if token_sid == "S-1-5-18": sids = ListSids() for sid in sids: if "winlogon" in sid[1].lower(): hToken = gethTokenFromPid(sid[0]) if hToken: print "\t[+] Using PID: " + str(sid[0]) return hToken else: return None # trying to impersonate a token else: pids = [int(x) for x in psutil.pids() if int(x)>4] for pid in pids: hToken = gethTokenFromPid(pid) if hToken: if GetTokenSid( hToken ) == token_sid: print "\t[+] Using PID: " + str(pid) return hToken
def run_module(self): # To access user provided attributes, use self.options dictionary Thread(target=self.monitoring).start() print "\n[*] STARTING PROGRAMS EXECUTION IN 5 SECONDS...\n" time.sleep(5) for b in self.binaries(): self.print_info(" [+] Executing %s ..." % b) previous_pid = psutil.pids() self.execute(b) time.sleep(int(self.args["sleep_time"])) new_pid = psutil.pids() print " [-] Killing the process" self.kill(b.split('.')[0], previous_pid, new_pid) print "\n[*] PLEASE CLOSE PROCMON PROCESS\n" self.parsing_results() print "\n[*] RESULTS PARSED TO XML\n"
def ReStart(groupname): '''??mysqlrouter''' restart_stat = None pids = psutil.pids() for pid in pids: p = psutil.Process(pid) cmdline = p.cmdline() if groupname+'.conf' in cmdline: try: os.kill(pid, 9) os.popen('cd /etc/mysqlrouter;nohup mysqlrouter -c mysqlrouter.conf -a %s.conf &' % groupname) return True except Exception,e: logging.error(traceback.format_exc()) return False restart_stat = True break if restart_stat is None: os.popen('cd /etc/mysqlrouter;nohup mysqlrouter -c mysqlrouter.conf -a %s.conf &' % groupname)
def test_exe_mocked(self): with mock.patch('psutil._pslinux.os.readlink', side_effect=OSError(errno.ENOENT, "")) as m: # No such file error; might be raised also if /proc/pid/exe # path actually exists for system processes with low pids # (about 0-20). In this case psutil is supposed to return # an empty string. ret = psutil.Process().exe() assert m.called self.assertEqual(ret, "") # ...but if /proc/pid no longer exist we're supposed to treat # it as an alias for zombie process with mock.patch('psutil._pslinux.os.path.lexists', return_value=False): self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
def POST(self): cpuused = psutil.cpu_percent(interval=None, percpu=False) memused = psutil.virtual_memory()[2] diskused = psutil.disk_usage('/')[3] pidcount = len(psutil.pids()) uptimeshell = subprocess.Popen(['uptime'], stderr=subprocess.PIPE,stdout=subprocess.PIPE) uptimeshell.wait() uptime = uptimeshell.communicate() return json.dumps( { "code": 0, "current": { "cpuused": cpuused, "memused": memused, "diskused": diskused, "pidcount": pidcount, "uptime": uptime } } )
def test_issue_687(self): # In case of thread ID: # - pid_exists() is supposed to return False # - Process(tid) is supposed to work # - pids() should not return the TID # See: https://github.com/giampaolo/psutil/issues/687 t = ThreadTask() t.start() try: p = psutil.Process() tid = p.threads()[1].id assert not psutil.pid_exists(tid), tid pt = psutil.Process(tid) pt.as_dict() self.assertNotIn(tid, psutil.pids()) finally: t.stop()
def test_issue_687(self): # In case of thread ID: # - pid_exists() is supposed to return False # - Process(tid) is supposed to work # - pids() should not return the TID # See: https://github.com/giampaolo/psutil/issues/687 t = ThreadTask() t.start() try: p = psutil.Process() tid = p.threads()[1].id assert not psutil.pid_exists(tid), tid pt = psutil.Process(tid) pt.as_dict() self.assertNotIn(tid, psutil.pids()) finally: t.stop() # ===================================================================== # --- sensors # =====================================================================
def kill_pid(pid, procname='', daemon=True): '''Find a PID with optional qualifications and kill it.''' if pid not in psutil.pids(): return False for p in psutil.process_iter(): if p.pid != pid: continue if procname and p.name() != procname: continue if daemon and p.ppid() != 1: continue _kill_pid_object(p) return True return False ########################################################################### # Some packages (nscd, nslcd) start their daemons after installation. # If I'm in an ARM chroot, they live after the chroot exits. Kill them. # In a cascade situation, sometimes the /proc/<PID>/root link is changed # to '/some/path (deleted)'. This routine still works.
def last_process_created(self, prev_pids, new_pids): pids = [] for p in new_pids: if p not in prev_pids: pids.append(p) return pids
def is_cmd_open(self, prev_pids): created_pids = [] for pid in psutil.pids(): if pid not in prev_pids: created_pids.append(pid) try: return 'cmd.exe' in [psutil.Process(p).name() for p in created_pids] except: pass
def handle_dll_local(self, subpath, binary): path = subpath + "\\x86_microsoft.windows.common-controls_6595b64144ccf1df_6.0.15063.0_none_583b8639f462029f\\" try: try: subprocess.check_call( ["powershell", "-C", "rm", "-r", "-Force", subpath, "-erroraction", "'silentlycontinue'"]) except: pass print "[+] Creating: " + path subprocess.check_call( ["powershell", "-C", "mkdir", path, ">", "$null"]) print "[+] Copying the malicious dll to the path" subprocess.check_call( ["powershell", "-C", "cp", self.args["malicious_dll"], path]) prev_pids = psutil.pids() print "[*] Executing the binary" subprocess.check_call(["powershell", "-C", binary]) time.sleep(1) if self.is_cmd_open(prev_pids): print colored("[*] THIS BINARY IS VULNERABLE TO DLL HIJACKING UAC BYPASS!", 'cyan', attrs=['bold']) if binary not in self._results["vulnerables"]: self._results["vulnerables"].append(binary) else: if binary not in self._results['sospechosos']: self._results['sospechosos'].append(binary) new_pids = psutil.pids() self.kill(binary, prev_pids, new_pids) print "[-] Deleting the path and cleaning up\n" subprocess.check_call( ["powershell", "-C", "rm", "-r", "-Force", subpath]) except subprocess.CalledProcessError as error: print "ERROR: COPYING THE FILE"
def handle_dll_local(self, subpath, binary, clean): path = subpath + "\\x86_microsoft.windows.common-controls_6595b64144ccf1df_6.0.15063.0_none_583b8639f462029f\\" try: print "[+] Creating: " + path subprocess.check_call( ["powershell", "-C", "mkdir", path, ">", "$null"]) print "[+] Copying the malicious dll to the path" subprocess.check_call( ["powershell", "-C", "cp", self.args["malicious_dll"], path]) prev_pids = psutil.pids() print "[*] Executing the binary" subprocess.check_call(["powershell", "-C", binary]) except subprocess.CalledProcessError as error: self.print_ko(str(error) + "\n")
def clean_path(self, subpath, binary): print "[-] Deleting the path and cleaning up\n" for pid in psutil.pids(): if binary in psutil.Process(pid).name(): try: print "Killing the process..." subprocess.check_call(["taskkill", "/t", "/f", "/pid", str(pid)]) except subprocess.CalledProcessError: self.print_ko("[!!] The process %s can't be killed" % binary) subprocess.check_call( ["powershell", "-C", "rm", "-r", "-Force", subpath])
def find(prefix, suffixes=None, local_bin=False): current_pid = os.getpid() for pid in psutil.pids(): if pid == current_pid or not psutil.pid_exists(pid): continue process = psutil.Process(pid) if process.name() == prefix: cmd = process.cmdline() if local_bin: check = False for word in cmd: if '/usr/local/bin' in word: check = True if not check: continue if suffixes is not None: check = False for word in cmd: if word in suffixes: check = True if not check: continue log.warning('Already existing') # : %s' % cmd) return True return False
def process_send_data(socket, context): """ Send all memory, cpu, disk, network data of computer to server(master node) """ while True: try: cpu_percent = psutil.cpu_percent(interval=1, percpu=True) memory_info = psutil.virtual_memory() disk_info = psutil.disk_usage('/') pids_computer = psutil.pids() info_to_send = json.dumps({ "computer_utc_clock": str(datetime.datetime.utcnow()), "computer_clock": str(datetime.datetime.now()), "hostname": platform.node(), "mac_address": mac_address(), "ipv4_interfaces": internet_addresses(), "cpu": { "percent_used": cpu_percent }, "memory": { "total_bytes": memory_info.total, "total_bytes_used": memory_info.used, "percent_used": memory_info.percent }, "disk": { "total_bytes": disk_info.total, "total_bytes_used": disk_info.used, "total_bytes_free": disk_info.free, "percent_used": disk_info.percent }, "process": pids_active(pids_computer) }).encode() #send json data in the channel 'status', although is not necessary to send. socket.send_multipart([b"status", info_to_send]) #time.sleep(0.500) except (KeyboardInterrupt, SystemExit): socket.close() context.term()
def lock_or_exit(self, lock_filename, message="process %s is already running"): pid = psutil.Process().pid if os.path.exists(lock_filename): with open(lock_filename) as lockfile: old_pid = int(lockfile.read()) if old_pid in psutil.pids(): logging.info(message % old_pid) sys.exit(1) else: os.unlink(lock_filename)
def running_emacs_count(self): """Count how many instances of Emacs are currently open""" try: import psutil return [psutil.Process(p).name() for p in psutil.pids()].count('emacs') except ImportError: Logger.warning('Cannot determine how many instances of Emacs are running. This may cause sync issues') return 0
def test_wait_for_pid(self): wait_for_pid(os.getpid()) nopid = max(psutil.pids()) + 99999 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
def test_pids(self): # Note: this test might fail if the OS is starting/killing # other processes in the meantime w = wmi.WMI().Win32_Process() wmi_pids = set([x.ProcessId for x in w]) psutil_pids = set(psutil.pids()) self.assertEqual(wmi_pids, psutil_pids)
def test_send_signal(self): sig = signal.SIGKILL if POSIX else signal.SIGTERM sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) exit_sig = p.wait() self.assertFalse(psutil.pid_exists(p.pid)) if POSIX: self.assertEqual(exit_sig, sig) # sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.ESRCH, "")): with self.assertRaises(psutil.NoSuchProcess): p.send_signal(sig) # sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.EPERM, "")): with self.assertRaises(psutil.AccessDenied): psutil.Process().send_signal(sig) # Sending a signal to process with PID 0 is not allowed as # it would affect every process in the process group of # the calling process (os.getpid()) instead of PID 0"). if 0 in psutil.pids(): p = psutil.Process(0) self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
def test_pid_0(self): # Process(0) is supposed to work on all platforms except Linux if 0 not in psutil.pids(): self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) return # test all methods p = psutil.Process(0) for name in psutil._as_dict_attrnames: if name == 'pid': continue meth = getattr(p, name) try: ret = meth() except psutil.AccessDenied: pass else: if name in ("uids", "gids"): self.assertEqual(ret.real, 0) elif name == "username": if POSIX: self.assertEqual(p.username(), 'root') elif WINDOWS: self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM') elif name == "name": assert name, name if hasattr(p, 'rlimit'): try: p.rlimit(psutil.RLIMIT_FSIZE) except psutil.AccessDenied: pass p.as_dict() if not OPENBSD: self.assertIn(0, psutil.pids()) self.assertTrue(psutil.pid_exists(0))
def test_pid_exists(self): sproc = get_test_subprocess() self.assertTrue(psutil.pid_exists(sproc.pid)) p = psutil.Process(sproc.pid) p.kill() p.wait() self.assertFalse(psutil.pid_exists(sproc.pid)) self.assertFalse(psutil.pid_exists(-1)) self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) # pid 0 psutil.pid_exists(0) == 0 in psutil.pids()
def test_pid_exists_2(self): reap_children() pids = psutil.pids() for pid in pids: try: assert psutil.pid_exists(pid) except AssertionError: # in case the process disappeared in meantime fail only # if it is no longer in psutil.pids() time.sleep(.1) if pid in psutil.pids(): self.fail(pid) pids = range(max(pids) + 5000, max(pids) + 6000) for pid in pids: self.assertFalse(psutil.pid_exists(pid), msg=pid)
def test_pids(self): plist = [x.pid for x in psutil.process_iter()] pidlist = psutil.pids() self.assertEqual(plist.sort(), pidlist.sort()) # make sure every pid is unique self.assertEqual(len(pidlist), len(set(pidlist)))
def test_pids(self): # Note: this test might fail if the OS is starting/killing # other processes in the meantime if SUNOS: cmd = ["ps", "-A", "-o", "pid"] else: cmd = ["ps", "ax", "-o", "pid"] p = get_test_subprocess(cmd, stdout=subprocess.PIPE) output = p.communicate()[0].strip() assert p.poll() == 0 if PY3: output = str(output, sys.stdout.encoding) pids_ps = [] for line in output.split('\n')[1:]: if line: pid = int(line.split()[0].strip()) pids_ps.append(pid) # remove ps subprocess pid which is supposed to be dead in meantime pids_ps.remove(p.pid) pids_psutil = psutil.pids() pids_ps.sort() pids_psutil.sort() # on OSX ps doesn't show pid 0 if OSX and 0 not in pids_ps: pids_ps.insert(0, 0) if pids_ps != pids_psutil: difference = [x for x in pids_psutil if x not in pids_ps] + \ [x for x in pids_ps if x not in pids_psutil] self.fail("difference: " + str(difference)) # for some reason ifconfig -a does not report all interfaces # returned by psutil
def pids(): return psutil.pids()
def find_pids_by_name(p_name): arr = [] for pid in psutil.pids(): if psutil.Process(id).name() == p_name: arr.append(pid) return arr
def kill_by_pids(pids): for pid in pids: psutil.Process(pid).kill()
def ancestors(pid): """ancestors(pid) -> int list Arguments: pid (int): PID of the process. Returns: List of PIDs of whose parent process is `pid` or an ancestor of `pid`. """ pids = [] while pid != 0: pids.append(pid) pid = parent(pid) return pids
def check_process(): print 'Checking WoW is running' wow_process_names = ["World of Warcraft"] running = False for pid in psutil.pids(): p = psutil.Process(pid) if any(p.name() in s for s in wow_process_names): print(p.name()) running = True if not running and not dev: print 'WoW is not running' exit() print 'WoW is running' # raw_input('Pleas e put your fishing-rod on key 1, zoom-in to max, move camera on fishing-float and press any key')
def process_count(self): p_count = 0 pids = psutil.pids() for pid in pids: try: p = psutil.Process(pid) exe = p.exe() cmd = p.cmdline() cmd = " ".join(cmd) if "jd.py" in cmd: # print pid, cmd[:30] p_count += 1 except: pass return p_count
def cleanScratch(): """ Clean scratch directory. It checks whether any other gcMapExplorer process is running. In case, when only one process (i.e. current) is running, all files with "gcx" prefix will be deleted from default scratch directory. """ config = getConfig() count = 0 for pid in psutil.pids(): p = None try: p = psutil.Process(pid) except psutil.NoSuchProcess: pass if p is not None: if 'gcMapExplorer' in p.name(): count += 1 # If only one gcMapExplorer is running, it is the current one if count == 1: for f in os.listdir(config['Dirs']['WorkingDirectory']): if not os.path.isfile(f): continue basename = os.path.basename(f) base = os.path.splitext(basename)[0] if "gcx" in base: try: print(' Removing File: {0}'.format(f)) os.remove(f) except IOError: pass print(' ... Finished Cleaning')
def get_system_status(memory_total=False, memory_total_actual=False, memory_total_usage=False, memory_total_free=False, all_pids=False, swap_memory=False, pid=False): """ Parameters ---------- threads: bool return dict {id: (user_time, system_time)} memory_maps: bool return dict {path: rss} Note ---- All memory is returned in `MiB` To calculate memory_percent: get_system_status(memory_usage=True) / get_system_status(memory_total=True) * 100 """ import psutil # ====== general system query ====== # if memory_total: return psutil.virtual_memory().total / float(2 ** 20) if memory_total_actual: return psutil.virtual_memory().available / float(2 ** 20) if memory_total_usage: return psutil.virtual_memory().used / float(2 ** 20) if memory_total_free: return psutil.virtual_memory().free / float(2 ** 20) if swap_memory: tmp = psutil.swap_memory() tmp.total /= float(2 ** 20) tmp.used /= float(2 ** 20) tmp.free /= float(2 ** 20) tmp.sin /= float(2**20) tmp.sout /= float(2**20) return tmp if all_pids: return psutil.pids() if pid: return os.getpid()
def quit(): mypid = os.getpid() for pid in sorted(psutil.pids(), reverse=True): if pid == mypid: continue try: p = psutil.Process(pid) if p.exe() == sys.executable: p.send_signal(signal.CTRL_C_EVENT) except: pass
def _get_process_by_path(self, path): """????????""" lower_path = str(path).lower() for pid in psutil.pids(): try: process = psutil.Process(pid) tmp = process.exe() if str(tmp).lower() == lower_path: return process except: continue return None
def test_pids(self): self.execute(psutil.pids) # --- net
def test_send_signal(self): sig = signal.SIGKILL if POSIX else signal.SIGTERM sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) exit_sig = p.wait() self.assertFalse(psutil.pid_exists(p.pid)) if POSIX: self.assertEqual(exit_sig, -sig) # sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.ESRCH, "")): with self.assertRaises(psutil.NoSuchProcess): p.send_signal(sig) # sproc = get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.EPERM, "")): with self.assertRaises(psutil.AccessDenied): psutil.Process().send_signal(sig) # Sending a signal to process with PID 0 is not allowed as # it would affect every process in the process group of # the calling process (os.getpid()) instead of PID 0"). if 0 in psutil.pids(): p = psutil.Process(0) self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)