我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.process_iter()。
def get_process(proc_name): """Get process given string in process cmd line. """ LOG = log.getLogger(__name__) proc = None try: for pr in psutil.process_iter(): for args in pr.cmdline(): if proc_name in args.split(" "): proc = pr return proc except BaseException: # pass LOG.error("Error fetching {%s} process..." % proc_name) return None
def _get_listening_ipaddrs(proc_name): """ Search for proc_name running on the node and return a list of unique IPs on which proc_name listens. Otherwise, return []. """ proc_listening_ips = [] for proc in psutil.process_iter(): # Use as_dict() to avoid API changes across versions of psutil. pdict = proc.as_dict(attrs=['name']) if pdict['name'] == proc_name: # connections() API has changed across psutil versions also. try: conns = proc.get_connections(kind="inet") # pylint: disable=bare-except except: conns = proc.connections(kind="inet") for con in conns: if con.status == "LISTEN": proc_listening_ips.append(con.laddr[0]) return list(set(proc_listening_ips))
def find_test_instances(prog): pid = {} for proc in psutil.process_iter(): _name = proc.name() if _name in ['python', 'python3', 'Python', 'Python3']: try: cmd = proc.cmdline() except psutil.AccessDenied: continue if len(cmd) > 5: if cmd[1].endswith(prog): i = cmd.index('-i') iss = cmd[i + 1] i = cmd.index('-t') tag = cmd[i + 1] i = cmd.index('-p') port = cmd[i + 1] since = datetime.datetime.fromtimestamp( proc.create_time()).strftime( "%Y-%m-%d %H:%M:%S") pid[proc.pid] = {'iss': iss, 'tag': tag, 'port': port, 'since': since} return pid
def find_server_processes(cls, port=None): result = [] for p in psutil.process_iter(): try: if isinstance(type(p).cmdline, property): pname = p.name cmdline = p.cmdline else: pname = p.name() cmdline = p.cmdline() if pname == 'adb': if 'fork-server' in cmdline and 'server' in cmdline: if port != None and str(port) not in cmdline: continue result.append(p) except psutil.NoSuchProcess: continue return result
def get_cmdlines(): """Retrieve the cmdline of each process running on the system.""" processes = [] # Get our current PID as well as the parent so we can exclude them. current_pid = os.getpid() parent_pid = os.getppid() for proc in psutil.process_iter(): try: if proc.pid not in [current_pid, parent_pid]: processes.append(' '.join(proc.cmdline())) except psutil.NoSuchProcess: pass return processes
def get_active_connections_count(self): active_connections = 0 for p in psutil.process_iter(): connections = p.get_connections(kind='inet') for conn in connections: if conn.status not in self.connection_states.values(): continue (local_ip, local_port) = conn.local_address if self.port != local_port: continue (remote_ip, remote_port) = conn.remote_address if (conn.family, remote_ip) in self.exclude_ips: continue if any(( (conn.family, local_ip) in self.ips, (conn.family, self.match_all_ips[conn.family]) in self.ips, local_ip.startswith(self.ipv4_mapped_ipv6_address['prefix']) and (conn.family, self.ipv4_mapped_ipv6_address['match_all']) in self.ips, )): active_connections += 1 return active_connections # =========================================== # Subclass: Linux
def _maybe_get_running_openvpn(): """ Looks for previously running openvpn instances. :rtype: psutil Process """ openvpn = None for p in psutil.process_iter(): try: # This needs more work, see #3268, but for the moment # we need to be able to filter out arguments in the form # --openvpn-foo, since otherwise we are shooting ourselves # in the feet. cmdline = p.cmdline() if any(map(lambda s: s.find( "LEAPOPENVPN") != -1, cmdline)): openvpn = p break except psutil.AccessDenied: pass return openvpn
def __check_for_existing_ppp_sessions(self): existing_ppp_pids = [] self.logger.info('Checking for existing PPP sessions') for proc in psutil.process_iter(): try: pinfo = proc.as_dict(attrs=['pid', 'name']) except: raise PPPError('Failed to check for existing PPP sessions') if 'pppd' in pinfo['name']: self.logger.info('Found existing PPP session on pid: %s', pinfo['pid']) existing_ppp_pids.append(pinfo['pid']) return existing_ppp_pids
def find_qemu_pid(vm_name): """ Find QEMU's PID that is associated with a given virtual machine :param str vm_name: libvirt domain name :rtype: int """ logging.info('Finding QEMU pid for domain %s', vm_name) libvirt_vm_pid_file = '/var/run/libvirt/qemu/{}.pid'.format(vm_name) try: with open(libvirt_vm_pid_file, 'r') as f: content = f.read() pid = int(content) return pid except IOError: for proc in psutil.process_iter(): cmdline = proc.cmdline()[1:] if proc.name() == "qemu-system-x86_64" and \ next((True for k, v in zip(cmdline, cmdline[1:]) if k == "-name" and vm_name in v), False): return proc.pid logging.critical('Cannot find QEMU') raise QEMUNotFoundError('Cannot find QEMU')
def process(service, action): # control start, stop and status of a external service running on RPI if action == 'off': Services[service]['state'] = 99 # wait for feedback from the service, do not change immediately Services[service]['newstate'] = 0 sse_parm['LED_%s' % Services[service]['id']] = Services[service]['lpro'] sse_parm['BUT_%s' % Services[service]['id']] = Services[service]['bpro'] Popen(Services[service]['pfun4'], shell=True) # and start the service elif action == 'on': Services[service]['state'] = 99 # wait for feedback from the service, do not change immediately Services[service]['newstate'] = 1 sse_parm['LED_%s' % Services[service]['id']] = Services[service]['lpro'] sse_parm['BUT_%s' % Services[service]['id']] = Services[service]['bpro'] Popen(Services[service]['pfun3'], shell=True) # and start the TVHeadOn service elif action == 'status': if version_info[0] < 4: return Services[service]['pfun1'] in [p.name for p in process_iter()] else: return Services[service]['pfun1'] in [p.name() for p in process_iter()] else: raise ValueError('Unknown action "%s"' % action)
def test_bash_operator_kill(self): import subprocess import psutil sleep_time = "100%d" % os.getpid() t = BashOperator( task_id='test_bash_operator_kill', execution_timeout=timedelta(seconds=1), bash_command="/bin/bash -c 'sleep %s'" % sleep_time, dag=self.dag) self.assertRaises( exceptions.AirflowTaskTimeout, t.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) sleep(2) pid = -1 for proc in psutil.process_iter(): if proc.cmdline() == ['sleep', sleep_time]: pid = proc.pid if pid != -1: os.kill(pid, signal.SIGTERM) self.fail("BashOperator's subprocess still running after stopping on timeout!")
def pid(self, process_name): """ process_name: System Process Name return: Process name's pid, integer """ for proc in psutil.process_iter(): try: pinfo = proc.as_dict(attrs=['pid', 'name']) p_name = pinfo['name'] p_pid = pinfo['pid'] if process_name == p_name: return p_pid except psutil.NoSuchProcess: pass
def find_current_steam_game_pid(): """ find current play game process id with GameOverlayUI.exe if not find return -1 """ target_pid = -1 for proc in psutil.process_iter(): try: if proc.name() == 'GameOverlayUI.exe': cmds = proc.cmdline() for index, arg in enumerate(cmds): if arg == '-pid': target_pid = int(cmds[index+1]) break break except psutil.AccessDenied: print("Permission error or access denied on process") return target_pid
def test_children_duplicates(self): # find the process which has the highest number of children table = collections.defaultdict(int) for p in psutil.process_iter(): try: table[p.ppid()] += 1 except psutil.Error: pass # this is the one, now let's make sure there are no duplicates pid = sorted(table.items(), key=lambda x: x[1])[-1][0] p = psutil.Process(pid) try: c = p.children(recursive=True) except psutil.AccessDenied: # windows pass else: self.assertEqual(len(c), len(set(c)))
def stop(self): pid = self.main_process.pid self.main_process.terminate() for process in psutil.process_iter(): if process.pid == pid: process.kill() self.main_process = multiprocessing.Process(target=wg_gesucht.start_searching, args=[self.login_info, self.log_output_queue, self.wg_ad_links, self.offline_ad_links]) self.log_stop_button.grid_forget() self.log_restart_button.grid(row=0, column=0, sticky=tk.W) self.log_back_button.grid(row=0, column=1, sticky=tk.W) self.log_text.configure(state=tk.NORMAL) self.log_text.insert(tk.END, '\n') self.log_text.configure(state=tk.DISABLED) self.log_text.see(tk.END)
def search_in_processes_by_name(process_name): """Searches currently running processes and returns a list of psutil.Process objects corresponding to processes that has the str process_name in them Args: process_name (str): Name of the process that'll be searched for Returns: list: List of psutil.Process objects corresponding to the filtered processes """ processlist = [] for p in psutil.process_iter(): if re.search(process_name, p.name(), re.IGNORECASE): processlist.append(p) return processlist #:tag:Processes
def TOR_PROC_CHECK(): isTorRunnin = False TOR_INFO = {} TOR_PROC = None for proc in psutil.process_iter(): try: pinfo = proc.as_dict(attrs=['pid', 'name']) except psutil.NoSuchProcess: pass else: if pinfo['name'] == "tor": isTorRunnin = True TOR_INFO['pid'] = pinfo['pid'] TOR_INFO['name'] = pinfo['name'] break if isTorRunnin == True: print ("[" + Fore.GREEN + Style.BRIGHT + "+" + Style.RESET_ALL + "]" + Fore.GREEN + Style.BRIGHT + " Tor is running." + Style.RESET_ALL) TOR_PROC = psutil.Process(int(TOR_INFO['pid'])) return TOR_PROC else: print ("[" + Fore.RED + Style.BRIGHT + "-" + Style.RESET_ALL + "]" + Fore.RED + Style.BRIGHT + " Tor is not running." + Style.RESET_ALL) exit()
def pid_by_name(name): """pid_by_name(name) -> int list Arguments: name (str): Name of program. Returns: List of PIDs matching `name` sorted by lifetime, youngest to oldest. Example: >>> os.getpid() in pid_by_name(name(os.getpid())) True """ def match(p): if p.name() == name: return True try: if p.exe() == name: return True except: pass return False return [p.pid for p in psutil.process_iter() if match(p)]
def _wait_pid(self): # first wait until all processes are started proc_list = filter(lambda x: isinstance(x, str) or isinstance(x, unicode), self.pid_list) real_pid_list = filter(lambda x: isinstance(x, int), self.pid_list) self.is_waiting = True while not self.quit_loop: if len(real_pid_list) == len(self.pid_list): break for proc in psutil.process_iter(): proc_name = proc.name().lower() for name in proc_list: if name.lower() in proc_name: real_pid_list.append(proc.pid) print 'Waiting for process: %s' % ', '.join(proc_list) time.sleep(0.2) self.is_waiting = False if self.quit_loop: return print 'Found PID: %s' % ', '.join(map(str, real_pid_list)) lib = self.libdivert_ref arr_len = len(real_pid_list) arr_type = c_int32 * arr_len lib.emulator_set_pid_list(self.config, arr_type(*real_pid_list), arr_len)
def update(self): """ Update the list of BrewPi processes by receiving them from the system with psutil. Returns: list of BrewPiProcess objects """ bpList = [] matching = [] # some OS's (OS X) do not allow processes to read info from other processes. try: matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'brewpi.py'in s for s in p.cmdline())] except psutil.AccessDenied: pass except psutil.ZombieProcess: pass for p in matching: bp = self.parseProcess(p) if bp: bpList.append(bp) self.list = bpList return self.list
def get_steam_path(): """Get the path for Steam from the Steam process. If that fails, it uses the registry on Windows. Returns: str: The path to Steam. If the path could not be found, the current directory is returned instead (os.curdir) """ if psutil: for pid in psutil.process_iter(): try: if pid.name().lower() == 'steam.exe' or pid.name().lower() == 'steam': return os.path.dirname(pid.exe()) except psutil.Error: logger.exception("Could not get Steam path from its process.") if winreg: try: reg_key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Valve\Steam') return os.path.normpath(winreg.QueryValueEx(reg_key, r'SteamPath')[0]) except WindowsError: logger.exception("Could not query registry for Steam path") return os.curdir
def check_process_exists_and_amqp_connected(name): processes = filter(lambda p: check_process_name(name, p), psutil.process_iter()) if not processes: critical("%s is not running" % name) for p in processes: try: connections = p.get_connections(kind='inet') except psutil.NoSuchProcess: continue found_amqp = ( len(list(itertools.takewhile(lambda c: len(c.remote_address) <= 1 or c.remote_address[1] != AMQP_PORT, connections))) != len(connections)) if found_amqp: ok("%s is working." % name) critical("%s is not connected to AMQP" % name)
def run(self): with open(os.devnull, 'w') as devnull: try: # If the service is disabled in Preferences # the query returns a non-zero error # should use this query better in future if subprocess.check_call(['launchctl', 'print', 'system/com.openssh.sshd'], stdout=devnull, stderr=devnull): return (True, "disabled") else: return (False, "enabled in Sharing Prefs: Remote Login") except subprocess.CalledProcessError as e: # this only gets run if sshd isn't enabled by # launchd as checked above pids = [] for p in psutil.process_iter(): try: if p.name() == 'sshd': pids.append(p.pid) except psutil.NoSuchProcess: pass if len(pids): return (False, "enabled manually - see pids: {} ".format(', '.join([str(p) for p in pids]))) else: return (True, "disabled")
def run(self): """ Run the check. All check scripts must implement this method. It must return a tuple of: (<success>, <message>) In this example, if the check succeeds and FileSharing processes are nowhere to be found, the check will return (True, "No FileSharing processes found"). If the check fails and an FileSharing process is found, it returns (False, "Found SMB or AFP FileSharing processes with pids <pids>") """ pids = [] for p in psutil.process_iter(): try: if (p.name() == 'AppleFileServer' or p.name() == 'smbd'): pids.append(p.pid) except psutil.NoSuchProcess: pass if len(pids): return (False, "found SMB or AFP file sharing processes with pids: {} - Disable Sharing Prefs: File Sharing".format(', '.join([str(p) for p in pids]))) else: return (True, "disabled")
def closeDispyScheduler(): ''' Close the Dispy Scheduler ''' global popen if popen != None: popen.terminate() popen.wait() popen=None else: for proc in psutil.process_iter(): try: cmdline = proc.cmdline() except (PermissionError, psutil.AccessDenied): continue for arg in cmdline: if re.search('dispyscheduler.py',arg): proc.send_signal(psutil.signal.SIGTERM)
def kill_all_auto_processes(): """ kill all running autoAwsume processes """ log.info('Killing all autoAwsume processes') for proc in psutil.process_iter(): try: #kill the autoAwsume process if no more auto-refresh profiles remain process_command = proc.cmdline() for command_string in process_command: if 'autoAwsume' in command_string: log.debug('Found an autoAwsume process, killing it') #the profile and default_profile environment variables proc.kill() except Exception: pass
def _KillWebServers(): for s in [signal.SIGTERM, signal.SIGINT, signal.SIGQUIT, signal.SIGKILL]: signalled = [] for server in ['lighttpd', 'webpagereplay']: for p in psutil.process_iter(): try: if not server in ' '.join(p.cmdline): continue logging.info('Killing %s %s %s', s, server, p.pid) p.send_signal(s) signalled.append(p) except Exception as e: logging.warning('Failed killing %s %s %s', server, p.pid, e) for p in signalled: try: p.wait(1) except Exception as e: logging.warning('Failed waiting for %s to die. %s', p.pid, e)
def KillAllAdb(): def GetAllAdb(): for p in psutil.process_iter(): try: if 'adb' in p.name: yield p except (psutil.NoSuchProcess, psutil.AccessDenied): pass for sig in [signal.SIGTERM, signal.SIGQUIT, signal.SIGKILL]: for p in GetAllAdb(): try: logging.info('kill %d %d (%s [%s])', sig, p.pid, p.name, ' '.join(p.cmdline)) p.send_signal(sig) except (psutil.NoSuchProcess, psutil.AccessDenied): pass for p in GetAllAdb(): try: logging.error('Unable to kill %d (%s [%s])', p.pid, p.name, ' '.join(p.cmdline)) except (psutil.NoSuchProcess, psutil.AccessDenied): pass
def test_parent_ppid(self): this_parent = os.getpid() sproc = get_test_subprocess() p = psutil.Process(sproc.pid) self.assertEqual(p.ppid(), this_parent) self.assertEqual(p.parent().pid, this_parent) # no other process is supposed to have us as parent reap_children(recursive=True) if APPVEYOR: # Occasional failures, see: # https://ci.appveyor.com/project/giampaolo/psutil/build/ # job/0hs623nenj7w4m33 return for p in psutil.process_iter(): if p.pid == sproc.pid: continue # XXX: sometimes this fails on Windows; not sure why. self.assertNotEqual(p.ppid(), this_parent, msg=p)
def test_ppid(self): if hasattr(os, 'getppid'): self.assertEqual(psutil.Process().ppid(), os.getppid()) this_parent = os.getpid() sproc = get_test_subprocess() p = psutil.Process(sproc.pid) self.assertEqual(p.ppid(), this_parent) self.assertEqual(p.parent().pid, this_parent) # no other process is supposed to have us as parent reap_children(recursive=True) if APPVEYOR: # Occasional failures, see: # https://ci.appveyor.com/project/giampaolo/psutil/build/ # job/0hs623nenj7w4m33 return for p in psutil.process_iter(): if p.pid == sproc.pid: continue # XXX: sometimes this fails on Windows; not sure why. self.assertNotEqual(p.ppid(), this_parent, msg=p)
def get_used_files(): """Get files used by processes with name scanpy.""" import psutil loop_over_scanpy_processes = (proc for proc in psutil.process_iter() if proc.name() == 'scanpy') filenames = [] for proc in loop_over_scanpy_processes: try: flist = proc.open_files() for nt in flist: filenames.append(nt.path) # This catches a race condition where a process ends # before we can examine its files except psutil.NoSuchProcess as err: pass return set(filenames)
def kill_proc(procname): done = False for proc in psutil.process_iter(): process = psutil.Process(proc.pid) if process.name() == procname: try: process.terminate() process.wait(timeout=3) done = True except psutil.AccessDenied: print "Error: Access Denied to terminate %s" % procname except psutil.NoSuchProcess: pass except psutil.TimeoutExpired: if proz['killcount'] == 2: print "Error: Terminating of %s failed! (tried 3 times)" % procname else: print "Error: Terminating of %s took to long.. retrying" % procname proz['killcount'] += 1 kill_proc(procname) break if done: print "%s terminated!" % procname
def get_pid(name): try: pid = [p.pid for p in psutil.process_iter() if name in str(p.name)] return pid[0] except IndexError: pass try: node_id = '/NODEINFO' node_api = rosnode.get_api_uri(rospy.get_master(), name) code, msg, pid = xmlrpclib.ServerProxy(node_api[2]).getPid(node_id) except IOError: pass else: return pid try: return int(check_output(["pidof", "-s", name])) except CalledProcessError: pass rospy.logerr("Node '" + name + "' is not running!") return None
def all_running_processes_for_case(self, _filter): procs = [] for p in psutil.process_iter(): try: if psutil.pid_exists(p.pid): if _filter in p.cwd(): procs.append({"run": p.cwd() + " " + p.name(),"cmd":p.cmdline()}) except (psutil.AccessDenied): pass except (psutil.NoSuchProcess): pass return procs ## # Function to kill all running processes for one case # @param _filter search filter <string> # @retval list of all killed processes <list>
def kill_all_running_processes_for_case(self, _filter): procs = [] for p in psutil.process_iter(): try: if psutil.pid_exists(p.pid): if _filter in p.cwd(): procs.append({"run": p.cwd() + " " + p.name()}) self.kill_proc_tree(p.pid) except (psutil.AccessDenied): pass except (psutil.NoSuchProcess): pass return procs ## # Function to retrieve all processes for one case (running and completed) # @param _case case <string> # @retval list of all processes <list>
def process_for_run(self, _filter): proc = None for p in psutil.process_iter(): try: if _filter in p.cwd(): proc = p break except: pass return proc ## # Function to start process # @param _case case <string> # @param _run_id run id <string> # @param _command command <list> # @param create_console create a console <boolean> # @param async program does not wait for process completion if set to True <boolean>
def _check_virtualbox(): """ Check if VirtualBox is running. VirtualBox conflicts with S2E's requirement for KVM, so VirtualBox must *not* be running together with S2E. """ # Adapted from https://github.com/giampaolo/psutil/issues/132#issuecomment-44017679 # to avoid race conditions for proc in psutil.process_iter(): try: if proc.name == 'VBoxHeadless': raise CommandError('S2E uses KVM to build images. VirtualBox ' 'is currently running, which is not ' 'compatible with KVM. Please close all ' 'VirtualBox VMs and try again') except NoSuchProcess: pass
def __running(): """ Check if SafeEyes is already running. """ process_count = 0 for proc in psutil.process_iter(): if not proc.cmdline: continue try: # Check if safeeyes is in process arguments if callable(proc.cmdline): # Latest psutil has cmdline function cmd_line = proc.cmdline() else: # In older versions cmdline was a list object cmd_line = proc.cmdline if ('python3' in cmd_line[0] or 'python' in cmd_line[0]) and ('safeeyes' in cmd_line[1] or 'safeeyes' in cmd_line): process_count += 1 if process_count > 1: return True # Ignore if process does not exist or does not have command line args except (IndexError, psutil.NoSuchProcess): pass return False
def kill_windows_cassandra_procs(): # On Windows, forcefully terminate any leftover previously running cassandra processes. This is a temporary # workaround until we can determine the cause of intermittent hung-open tests and file-handles. if is_win(): try: import psutil for proc in psutil.process_iter(): try: pinfo = proc.as_dict(attrs=['pid', 'name', 'cmdline']) except psutil.NoSuchProcess: pass else: if (pinfo['name'] == 'java.exe' and '-Dcassandra' in pinfo['cmdline']): print 'Found running cassandra process with pid: ' + str(pinfo['pid']) + '. Killing.' psutil.Process(pinfo['pid']).kill() except ImportError: debug("WARN: psutil not installed. Cannot detect and kill " "running cassandra processes - you may see cascading dtest failures.")
def getRunningFuzzers(): proc_list = [] for proc in psutil.process_iter(): if any(fuzzer in s for s in proc.cmdline()): proc_list.append(proc) if any("crash-watch" in s for s in proc.cmdline()): proc_list.append(proc) if any(ROVING_CLIENT in s for s in proc.cmdline()): proc_list.append(proc) if any(ROVING_SERVER in s for s in proc.cmdline()): proc_list.append(proc) if any(FUZZDIR + "/target" in s for s in proc.cmdline()): proc_list.append(proc) if any(targ in s for s in proc.cmdline()): if proc not in proc_list: proc_list.append(proc) # hard coded, in the future someone might use another fuzzer! if any("afl-fuzz" in s for s in proc.cmdline()): if proc not in proc_list: proc_list.append(proc) proc_list = set(proc_list) # easy way to filter duplicates ;) proc_list = list(proc_list) return proc_list
def pidof(pgname): pids = [] for proc in psutil.process_iter(): # search for matches in the process name and cmdline try: name = proc.name() except psutil.Error: pass else: if name == pgname: pids.append(str(proc.pid)) continue try: cmdline = proc.cmdline() except psutil.Error: pass else: if cmdline and cmdline[0] == pgname: pids.append(str(proc.pid)) return pids
def main(): templ = "%-5s %-30s %-30s %-13s %-6s %s" print(templ % ( "Proto", "Local address", "Remote address", "Status", "PID", "Program name")) proc_names = {} for p in psutil.process_iter(): try: proc_names[p.pid] = p.name() except psutil.Error: pass for c in psutil.net_connections(kind='inet'): laddr = "%s:%s" % (c.laddr) raddr = "" if c.raddr: raddr = "%s:%s" % (c.raddr) print(templ % ( proto_map[(c.family, c.type)], laddr, raddr or AD, c.status, c.pid or AD, proc_names.get(c.pid, '?')[:15], ))
def poll(interval): # sleep some time time.sleep(interval) procs = [] procs_status = {} for p in psutil.process_iter(): try: p.dict = p.as_dict(['username', 'nice', 'memory_info', 'memory_percent', 'cpu_percent', 'cpu_times', 'name', 'status']) try: procs_status[p.dict['status']] += 1 except KeyError: procs_status[p.dict['status']] = 1 except psutil.NoSuchProcess: pass else: procs.append(p) # return processes sorted by CPU percent usage processes = sorted(procs, key=lambda p: p.dict['cpu_percent'], reverse=True) return (processes, procs_status)
def kill_pid(pid, procname='', daemon=True): '''Find a PID with optional qualifications and kill it.''' if pid not in psutil.pids(): return False for p in psutil.process_iter(): if p.pid != pid: continue if procname and p.name() != procname: continue if daemon and p.ppid() != 1: continue _kill_pid_object(p) return True return False ########################################################################### # Some packages (nscd, nslcd) start their daemons after installation. # If I'm in an ARM chroot, they live after the chroot exits. Kill them. # In a cascade situation, sometimes the /proc/<PID>/root link is changed # to '/some/path (deleted)'. This routine still works.
def ownhandle() : my_regex = r".*" + re.escape(sys.argv[1]) + r".*" regex = re.compile(my_regex, re.IGNORECASE) for proc in psutil.process_iter(): try: pinfo = proc.as_dict(attrs=['pid']) except psutil.NoSuchProcess: pass else: #print pinfo['pid'] try: proci = psutil.Process(pinfo['pid']) for files in proci.open_files() : #print files #handles = re.match(my_regex, files, re.IGNORECASE) match = regex.search(str(files)) #print match if match is not None and pinfo['pid'] not in safepids : return(pinfo['pid']) except : pass