我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.AccessDenied()。
def pids_active(pids_computer): """ This function find pids of computer and return the valid. """ pid_valid = {} for pid in pids_computer: data = None try: process = psutil.Process(pid) data = {"pid": process.pid, "status": process.status(), "percent_cpu_used": process.cpu_percent(interval=0.0), "percent_memory_used": process.memory_percent()} except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess): data = None if data is not None: pid_valid[process.name()] = data return pid_valid
def on_ask_user_allow_or_deny(self, evt): try: exe = evt.process.exe() cmdline = evt.process.cmdline() except (psutil.NoSuchProcess, psutil.AccessDenied): logger.warn('Ransomware process is caught, but the process does ' 'not exist (PID: %d)' % evt.pid) logger.critical('\033[91m') logger.critical('*** [Crypto ransom detected] ***') logger.critical('[PID]: %d' % evt.process.pid) logger.critical('[EXE]: %r' % exe) logger.critical('[Command]: %r' % cmdline) logger.critical('[File]: %s' % evt.path) logger.critical('********************************\033[0m') flush_stdin() yes_no = raw_input('> Block it? (Y/n) ') allow = 'n' in yes_no.lower() if allow: event.EventUserAllowProcess(evt.process).fire() else: event.EventUserDenyProcess(evt.process).fire()
def find_test_instances(prog): pid = {} for proc in psutil.process_iter(): _name = proc.name() if _name in ['python', 'python3', 'Python', 'Python3']: try: cmd = proc.cmdline() except psutil.AccessDenied: continue if len(cmd) > 5: if cmd[1].endswith(prog): i = cmd.index('-i') iss = cmd[i + 1] i = cmd.index('-t') tag = cmd[i + 1] i = cmd.index('-p') port = cmd[i + 1] since = datetime.datetime.fromtimestamp( proc.create_time()).strftime( "%Y-%m-%d %H:%M:%S") pid[proc.pid] = {'iss': iss, 'tag': tag, 'port': port, 'since': since} return pid
def _maybe_get_running_openvpn(): """ Looks for previously running openvpn instances. :rtype: psutil Process """ openvpn = None for p in psutil.process_iter(): try: # This needs more work, see #3268, but for the moment # we need to be able to filter out arguments in the form # --openvpn-foo, since otherwise we are shooting ourselves # in the feet. cmdline = p.cmdline() if any(map(lambda s: s.find( "LEAPOPENVPN") != -1, cmdline)): openvpn = p break except psutil.AccessDenied: pass return openvpn
def on_crypto_ransom(self, evt): logger.debug('Whitelist: %s' % json.dumps(self.whitelist, indent=4)) logger.debug('Suspended: %s' % json.dumps([ {'pid': p.pid, 'exe': p.exe()} for p in self.suspended ], indent=4)) if any(suspended.pid == evt.pid for suspended in self.suspended): return # ignore captured ransom events try: p = psutil.Process(evt.pid) cmdline = p.cmdline() except (psutil.NoSuchProcess, psutil.AccessDenied): logger.warn('Suspicious process %d exited before being caught' % evt.pid) return if cmdline not in self.whitelist: p.suspend() self.suspended.append(p) event.EventAskUserAllowOrDeny(p, evt.path).fire() else: logger.info('Allowed white-listed process: %d' % evt.pid)
def get_absolute_path(event_raw): ''' Keeps a cache of processes' cwds, in case that their events might come after they're terminated. ''' pid = event_raw.get('pid') path = event_raw.get('path') if path and path[0] == '/': return os.path.realpath(path) cwd = None logger.debug('%r' % pid_cwd) try: process = psutil.Process(pid) cwd = process.cwd() pid_cwd[pid] = cwd # cache every pid's cwd except (psutil.NoSuchProcess, psutil.AccessDenied): cwd = pid_cwd.get(pid) if not cwd: return None return os.path.realpath(os.path.join(cwd, path))
def find_current_steam_game_pid(): """ find current play game process id with GameOverlayUI.exe if not find return -1 """ target_pid = -1 for proc in psutil.process_iter(): try: if proc.name() == 'GameOverlayUI.exe': cmds = proc.cmdline() for index, arg in enumerate(cmds): if arg == '-pid': target_pid = int(cmds[index+1]) break break except psutil.AccessDenied: print("Permission error or access denied on process") return target_pid
def test_ad_on_process_creation(self): # We are supposed to be able to instantiate Process also in case # of zombie processes or access denied. with mock.patch.object(psutil.Process, 'create_time', side_effect=psutil.AccessDenied) as meth: psutil.Process() assert meth.called with mock.patch.object(psutil.Process, 'create_time', side_effect=psutil.ZombieProcess(1)) as meth: psutil.Process() assert meth.called with mock.patch.object(psutil.Process, 'create_time', side_effect=ValueError) as meth: with self.assertRaises(ValueError): psutil.Process() assert meth.called
def test_special_pid(self): p = psutil.Process(4) self.assertEqual(p.name(), 'System') # use __str__ to access all common Process properties to check # that nothing strange happens str(p) p.username() self.assertTrue(p.create_time() >= 0.0) try: rss, vms = p.memory_info()[:2] except psutil.AccessDenied: # expected on Windows Vista and Windows 7 if not platform.uname()[1] in ('vista', 'win-7', 'win7'): raise else: self.assertTrue(rss > 0)
def test_num_threads(self): # on certain platforms such as Linux we might test for exact # thread number, since we always have with 1 thread per process, # but this does not apply across all platforms (OSX, Windows) p = psutil.Process() if OPENBSD: try: step1 = p.num_threads() except psutil.AccessDenied: raise unittest.SkipTest("on OpenBSD this requires root access") else: step1 = p.num_threads() thread = ThreadTask() thread.start() try: step2 = p.num_threads() self.assertEqual(step2, step1 + 1) thread.stop() finally: if thread._running: thread.stop()
def test_children_duplicates(self): # find the process which has the highest number of children table = collections.defaultdict(int) for p in psutil.process_iter(): try: table[p.ppid()] += 1 except psutil.Error: pass # this is the one, now let's make sure there are no duplicates pid = sorted(table.items(), key=lambda x: x[1])[-1][0] p = psutil.Process(pid) try: c = p.children(recursive=True) except psutil.AccessDenied: # windows pass else: self.assertEqual(len(c), len(set(c)))
def skip_on_access_denied(only_if=None): """Decorator to Ignore AccessDenied exceptions.""" def decorator(fun): @functools.wraps(fun) def wrapper(*args, **kwargs): try: return fun(*args, **kwargs) except psutil.AccessDenied: if only_if is not None: if not only_if: raise msg = "%r was skipped because it raised AccessDenied" \ % fun.__name__ raise unittest.SkipTest(msg) return wrapper return decorator
def update(self): """ Update the list of BrewPi processes by receiving them from the system with psutil. Returns: list of BrewPiProcess objects """ bpList = [] matching = [] # some OS's (OS X) do not allow processes to read info from other processes. try: matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'brewpi.py'in s for s in p.cmdline())] except psutil.AccessDenied: pass except psutil.ZombieProcess: pass for p in matching: bp = self.parseProcess(p) if bp: bpList.append(bp) self.list = bpList return self.list
def set_highest_priority(): try: import psutil except ImportError: return proc = psutil.Process() if not hasattr(proc, 'nice'): return # Want to set realtime on Windows. # Fail hard for anything else right now, so it is obvious what to fix # when adding other OS support. try: proc.nice(psutil.REALTIME_PRIORITY_CLASS) return True except psutil.AccessDenied: pass
def closeDispyScheduler(): ''' Close the Dispy Scheduler ''' global popen if popen != None: popen.terminate() popen.wait() popen=None else: for proc in psutil.process_iter(): try: cmdline = proc.cmdline() except (PermissionError, psutil.AccessDenied): continue for arg in cmdline: if re.search('dispyscheduler.py',arg): proc.send_signal(psutil.signal.SIGTERM)
def KillAllAdb(): def GetAllAdb(): for p in psutil.process_iter(): try: if 'adb' in p.name: yield p except (psutil.NoSuchProcess, psutil.AccessDenied): pass for sig in [signal.SIGTERM, signal.SIGQUIT, signal.SIGKILL]: for p in GetAllAdb(): try: logging.info('kill %d %d (%s [%s])', sig, p.pid, p.name, ' '.join(p.cmdline)) p.send_signal(sig) except (psutil.NoSuchProcess, psutil.AccessDenied): pass for p in GetAllAdb(): try: logging.error('Unable to kill %d (%s [%s])', p.pid, p.name, ' '.join(p.cmdline)) except (psutil.NoSuchProcess, psutil.AccessDenied): pass
def test_num_threads(self): # on certain platforms such as Linux we might test for exact # thread number, since we always have with 1 thread per process, # but this does not apply across all platforms (OSX, Windows) p = psutil.Process() if OPENBSD: try: step1 = p.num_threads() except psutil.AccessDenied: raise unittest.SkipTest("on OpenBSD this requires root access") else: step1 = p.num_threads() thread = ThreadTask() thread.start() try: step2 = p.num_threads() self.assertEqual(step2, step1 + 1) finally: thread.stop()
def test_threads(self): p = psutil.Process() if OPENBSD: try: step1 = p.threads() except psutil.AccessDenied: raise unittest.SkipTest("on OpenBSD this requires root access") else: step1 = p.threads() thread = ThreadTask() thread.start() try: step2 = p.threads() self.assertEqual(len(step2), len(step1) + 1) # on Linux, first thread id is supposed to be this process if LINUX: self.assertEqual(step2[0].id, os.getpid()) athread = step2[0] # test named tuple self.assertEqual(athread.id, athread[0]) self.assertEqual(athread.user_time, athread[1]) self.assertEqual(athread.system_time, athread[2]) finally: thread.stop()
def test_process_iter(self): self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) sproc = get_test_subprocess() self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) p = psutil.Process(sproc.pid) p.kill() p.wait() self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) with mock.patch('psutil.Process', side_effect=psutil.NoSuchProcess(os.getpid())): self.assertEqual(list(psutil.process_iter()), []) with mock.patch('psutil.Process', side_effect=psutil.AccessDenied(os.getpid())): with self.assertRaises(psutil.AccessDenied): list(psutil.process_iter())
def kill_proc(procname): done = False for proc in psutil.process_iter(): process = psutil.Process(proc.pid) if process.name() == procname: try: process.terminate() process.wait(timeout=3) done = True except psutil.AccessDenied: print "Error: Access Denied to terminate %s" % procname except psutil.NoSuchProcess: pass except psutil.TimeoutExpired: if proz['killcount'] == 2: print "Error: Terminating of %s failed! (tried 3 times)" % procname else: print "Error: Terminating of %s took to long.. retrying" % procname proz['killcount'] += 1 kill_proc(procname) break if done: print "%s terminated!" % procname
def all_running_processes_for_case(self, _filter): procs = [] for p in psutil.process_iter(): try: if psutil.pid_exists(p.pid): if _filter in p.cwd(): procs.append({"run": p.cwd() + " " + p.name(),"cmd":p.cmdline()}) except (psutil.AccessDenied): pass except (psutil.NoSuchProcess): pass return procs ## # Function to kill all running processes for one case # @param _filter search filter <string> # @retval list of all killed processes <list>
def kill_all_running_processes_for_case(self, _filter): procs = [] for p in psutil.process_iter(): try: if psutil.pid_exists(p.pid): if _filter in p.cwd(): procs.append({"run": p.cwd() + " " + p.name()}) self.kill_proc_tree(p.pid) except (psutil.AccessDenied): pass except (psutil.NoSuchProcess): pass return procs ## # Function to retrieve all processes for one case (running and completed) # @param _case case <string> # @retval list of all processes <list>
def find_test_instance(iss, tag): match = {"-i": iss, "-t": tag} for proc in psutil.process_iter(): try: cmd = proc.cmdline() except psutil.AccessDenied: continue if len(cmd) < 5: continue flag = 0 for first, second in match.items(): try: i = cmd.index(first) except ValueError: break if cmd[i + 1] != second: break else: flag += 1 if flag == len(match): return proc return None
def get_free_port(address, initial_port): """Find an unused TCP port in a specified range. This should not be used in misson-critical applications - a race condition may occur if someone grabs the port before caller of this function has chance to use it. Parameters: address (string): an ip address of interface to use initial_port (int) : port to start iteration with Return: iterator that will return next unused port on a specified interface """ try: # On OSX this function requires root privileges psutil.net_connections() except psutil.AccessDenied: return count(initial_port) def _unused_ports(): for port in count(initial_port): # check if the port is being used connect_using_port = ( conn for conn in psutil.net_connections() if hasattr(conn, 'laddr') and conn.laddr[0] == address and conn.laddr[1] == port ) # only generate unused ports if not any(connect_using_port): yield port return _unused_ports()
def _get_ports_in_use(cls): """ Returns a set of ports currently used on localhost. """ try: return set([x.laddr[1] for x in psutil.net_connections(kind='inet4')]) except psutil.AccessDenied: # On some platforms (such as OS X), root privilege is required to get used ports. # In that case we avoid port confliction to the best of our knowledge. _logger.info('ports in use cannot be obtained on this platform; ports will be assigned sequentially') return set()
def on_user_deny_process(self, evt): for p in self.suspended: if p.pid == evt.pid: logger.info('Killing PID %d (%s)' % (p.pid, evt.cmdline)) p.kill() self.suspended.remove(p) try: cmdline = evt.cmdline except psutil.AccessDenied: return if cmdline in self.whitelist: self.whitelist.remove(cmdline) return
def to_absolute(pid, fd, path): if not path: return None if path[0] == '/': return path try: process = psutil.Process(pid) cwd = process.cwd() pid_cwd[pid] = cwd # cache every pid's cwd except (psutil.NoSuchProcess, psutil.AccessDenied): cwd = pid_cwd.get(pid) if not cwd: return None return os.path.realpath(os.path.join(cwd, path))
def find_launch_game_pid(process_name): """ find current play game process id with process_name if not find return -1 """ target_pid = -1 for proc in psutil.process_iter(): try: if proc.name() == process_name: return proc.pid except psutil.AccessDenied: print("Permission error or access denied on process") return target_pid
def test_threads_mocked(self): # Test the case where os.listdir() returns a file (thread) # which no longer exists by the time we open() it (race # condition). threads() is supposed to ignore that instead # of raising NSP. def open_mock(name, *args, **kwargs): if name.startswith('/proc/%s/task' % os.getpid()): raise IOError(errno.ENOENT, "") else: return orig_open(name, *args, **kwargs) orig_open = open patch_point = 'builtins.open' if PY3 else '__builtin__.open' with mock.patch(patch_point, side_effect=open_mock) as m: ret = psutil.Process().threads() assert m.called self.assertEqual(ret, []) # ...but if it bumps into something != ENOENT we want an # exception. def open_mock(name, *args, **kwargs): if name.startswith('/proc/%s/task' % os.getpid()): raise IOError(errno.EPERM, "") else: return orig_open(name, *args, **kwargs) with mock.patch(patch_point, side_effect=open_mock): self.assertRaises(psutil.AccessDenied, psutil.Process().threads) # not sure why (doesn't fail locally) # https://travis-ci.org/giampaolo/psutil/jobs/108629915
def test_process__repr__(self, func=repr): p = psutil.Process() r = func(p) self.assertIn("psutil.Process", r) self.assertIn("pid=%s" % p.pid, r) self.assertIn("name=", r) self.assertIn(p.name(), r) with mock.patch.object(psutil.Process, "name", side_effect=psutil.ZombieProcess(os.getpid())): p = psutil.Process() r = func(p) self.assertIn("pid=%s" % p.pid, r) self.assertIn("zombie", r) self.assertNotIn("name=", r) with mock.patch.object(psutil.Process, "name", side_effect=psutil.NoSuchProcess(os.getpid())): p = psutil.Process() r = func(p) self.assertIn("pid=%s" % p.pid, r) self.assertIn("terminated", r) self.assertNotIn("name=", r) with mock.patch.object(psutil.Process, "name", side_effect=psutil.AccessDenied(os.getpid())): p = psutil.Process() r = func(p) self.assertIn("pid=%s" % p.pid, r) self.assertNotIn("name=", r)
def test_access_denied__repr__(self, func=repr): self.assertEqual( repr(psutil.AccessDenied(321)), "psutil.AccessDenied (pid=321)") self.assertEqual( repr(psutil.AccessDenied(321, name='foo')), "psutil.AccessDenied (pid=321, name='foo')") self.assertEqual( repr(psutil.AccessDenied(321, msg='foo')), "psutil.AccessDenied foo")
def assert_stdout(self, exe, args=None): exe = '"%s"' % os.path.join(SCRIPTS_DIR, exe) if args: exe = exe + ' ' + args try: out = sh(sys.executable + ' ' + exe).strip() except RuntimeError as err: if 'AccessDenied' in str(err): return str(err) else: raise assert out, out return out
def test_issue_24(self): p = psutil.Process(0) self.assertRaises(psutil.AccessDenied, p.kill)
def test_handles_leak(self): # Call all Process methods and make sure no handles are left # open. This is here mainly to make sure functions using # OpenProcess() always call CloseHandle(). def call(p, attr): attr = getattr(p, name, None) if attr is not None and callable(attr): attr() else: attr p = psutil.Process(self.pid) failures = [] for name in dir(psutil.Process): if name.startswith('_') \ or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', 'send_signal', 'wait', 'children', 'as_dict'): continue else: try: call(p, name) num1 = p.num_handles() call(p, name) num2 = p.num_handles() except (psutil.NoSuchProcess, psutil.AccessDenied): pass else: if num2 > num1: fail = \ "failure while processing Process.%s method " \ "(before=%s, after=%s)" % (name, num1, num2) failures.append(fail) if failures: self.fail('\n' + '\n'.join(failures))
def test_name_always_available(self): # On Windows name() is never supposed to raise AccessDenied, # see https://github.com/giampaolo/psutil/issues/627 for p in psutil.process_iter(): try: p.name() except psutil.NoSuchProcess: pass
def test_compare_name_exe(self): for p in psutil.process_iter(): try: a = os.path.basename(p.exe()) b = p.name() except (psutil.NoSuchProcess, psutil.AccessDenied): pass else: self.assertEqual(a, b)
def test_name(self): name = psutil.Process(self.pid).name() with mock.patch("psutil._psplatform.cext.proc_exe", side_effect=psutil.AccessDenied(os.getpid())) as fun: self.assertEqual(psutil.Process(self.pid).name(), name) assert fun.called
def test_win_service_get(self): name = next(psutil.win_service_iter()).name() with self.assertRaises(psutil.NoSuchProcess) as cm: psutil.win_service_get(name + '???') self.assertEqual(cm.exception.name, name + '???') # test NoSuchProcess service = psutil.win_service_get(name) exc = WindowsError( psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST, "") with mock.patch("psutil._psplatform.cext.winservice_query_status", side_effect=exc): self.assertRaises(psutil.NoSuchProcess, service.status) with mock.patch("psutil._psplatform.cext.winservice_query_config", side_effect=exc): self.assertRaises(psutil.NoSuchProcess, service.username) # test AccessDenied exc = WindowsError( psutil._psplatform.cext.ERROR_ACCESS_DENIED, "") with mock.patch("psutil._psplatform.cext.winservice_query_status", side_effect=exc): self.assertRaises(psutil.AccessDenied, service.status) with mock.patch("psutil._psplatform.cext.winservice_query_config", side_effect=exc): self.assertRaises(psutil.AccessDenied, service.username) # test __str__ and __repr__ self.assertIn(service.name(), str(service)) self.assertIn(service.display_name(), str(service)) self.assertIn(service.name(), repr(service)) self.assertIn(service.display_name(), repr(service))