我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用psutil.net_connections()。
def test_procfs_path(self): tdir = tempfile.mkdtemp() try: psutil.PROCFS_PATH = tdir self.assertRaises(IOError, psutil.virtual_memory) self.assertRaises(IOError, psutil.cpu_times) self.assertRaises(IOError, psutil.cpu_times, percpu=True) self.assertRaises(IOError, psutil.boot_time) # self.assertRaises(IOError, psutil.pids) self.assertRaises(IOError, psutil.net_connections) self.assertRaises(IOError, psutil.net_io_counters) self.assertRaises(IOError, psutil.net_if_stats) self.assertRaises(IOError, psutil.disk_io_counters) self.assertRaises(IOError, psutil.disk_partitions) self.assertRaises(psutil.NoSuchProcess, psutil.Process) finally: psutil.PROCFS_PATH = "/proc" os.rmdir(tdir)
def test_net_connections_mocked(self): def open_mock(name, *args, **kwargs): if name == '/proc/net/unix': return io.StringIO(textwrap.dedent(u"""\ 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O 000000000000000000000000000000000000000000000000000000 """)) else: return orig_open(name, *args, **kwargs) orig_open = open patch_point = 'builtins.open' if PY3 else '__builtin__.open' with mock.patch(patch_point, side_effect=open_mock) as m: psutil.net_connections(kind='unix') assert m.called # ===================================================================== # system disk # =====================================================================
def test_net_connections_mocked(self): def open_mock(name, *args, **kwargs): if name == '/proc/net/unix': return io.StringIO(textwrap.dedent(u"""\ 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O 000000000000000000000000000000000000000000000000000000 """)) else: return orig_open(name, *args, **kwargs) orig_open = open patch_point = 'builtins.open' if PY3 else '__builtin__.open' with mock.patch(patch_point, side_effect=open_mock) as m: psutil.net_connections(kind='unix') assert m.called # ===================================================================== # --- system disk # =====================================================================
def __init__(self, host="localhost", port=6379): conn = psutil.net_connections() ports = [c.laddr[1] for c in conn] port = max(ports) + 1 self.__port = port self.__redis_server = Popen(["redis-server", "--port", str(port)]) sleep(1) self.__worker = [ Process(target=work, args=(["--host", "localhost", "--port", str(port)],), daemon=False) for _ in range(2) ] for p in self.__worker: p.start() super().__init__(host, port)
def get_tcp_info(self): returnData = {} try: conns = psutil.net_connections() sumConn = {} for con in conns: if con.status == 'NONE': continue if con.status not in sumConn: sumConn[con.status] = 0 sumConn[con.status] = sumConn[con.status] + 1 for element in sumConn: if 'tcpsum' not in returnData: returnData['tcpsum'] = {} returnData['tcpsum'][element] = sumConn[element] except Exception: pybixlib.error(self.logHead + traceback.format_exc()) self.errorInfoDone(traceback.format_exc()) return returnData
def main(): templ = "%-5s %-30s %-30s %-13s %-6s %s" print(templ % ( "Proto", "Local address", "Remote address", "Status", "PID", "Program name")) proc_names = {} for p in psutil.process_iter(): try: proc_names[p.pid] = p.name() except psutil.Error: pass for c in psutil.net_connections(kind='inet'): laddr = "%s:%s" % (c.laddr) raddr = "" if c.raddr: raddr = "%s:%s" % (c.raddr) print(templ % ( proto_map[(c.family, c.type)], laddr, raddr or AD, c.status, c.pid or AD, proc_names.get(c.pid, '?')[:15], ))
def get_tcp_all_conns_count_top(top=10): netstat = psutil.net_connections(kind="tcp") cared_data = list() for sconn in netstat: cared_data.append((sconn.laddr[0], sconn.laddr[1])) cared_data_with_counter = Counter(cared_data) if len(cared_data) < top: top = len(cared_data) tcp_conns_top_x = sorted(cared_data_with_counter.iteritems(), key=lambda x: x[1], reverse=True) return tcp_conns_top_x[0:top]
def check_ports(ports): connections = psutil.net_connections() all_ports = set( c.laddr[1] for c in connections if isinstance(c.laddr, tuple)) ports_inuse = set(ports) & all_ports if ports_inuse: raise TaskException( 'ports {} are in use'.format(', '.join(map(str, ports_inuse))))
def get_free_port(address, initial_port): """Find an unused TCP port in a specified range. This should not be used in misson-critical applications - a race condition may occur if someone grabs the port before caller of this function has chance to use it. Parameters: address (string): an ip address of interface to use initial_port (int) : port to start iteration with Return: iterator that will return next unused port on a specified interface """ try: # On OSX this function requires root privileges psutil.net_connections() except psutil.AccessDenied: return count(initial_port) def _unused_ports(): for port in count(initial_port): # check if the port is being used connect_using_port = ( conn for conn in psutil.net_connections() if hasattr(conn, 'laddr') and conn.laddr[0] == address and conn.laddr[1] == port ) # only generate unused ports if not any(connect_using_port): yield port return _unused_ports()
def _get_ports_in_use(cls): """ Returns a set of ports currently used on localhost. """ try: return set([x.laddr[1] for x in psutil.net_connections(kind='inet4')]) except psutil.AccessDenied: # On some platforms (such as OS X), root privilege is required to get used ports. # In that case we avoid port confliction to the best of our knowledge. _logger.info('ports in use cannot be obtained on this platform; ports will be assigned sequentially') return set()
def test_net_connections(self): self.execute(psutil.net_connections)
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop): # see: https://github.com/giampaolo/psutil/issues/623 try: s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.addCleanup(s.close) s.bind(("::1", 0)) except socket.error: pass psutil.net_connections(kind='inet6')
def compare_proc_sys_cons(self, pid, proc_cons): from psutil._common import pconn sys_cons = [c[:-1] for c in psutil.net_connections(kind='all') if c.pid == pid] if FREEBSD: # on FreeBSD all fds are set to -1 proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons] self.assertEqual(sorted(proc_cons), sorted(sys_cons))
def pidof(target): """pidof(target) -> int list Get PID(s) of `target`. The returned PID(s) depends on the type of `target`: - :class:`str`: PIDs of all processes with a name matching `target`. - :class:`pwnlib.tubes.process.process`: singleton list of the PID of `target`. - :class:`pwnlib.tubes.sock.sock`: singleton list of the PID at the remote end of `target` if it is running on the host. Otherwise an empty list. Arguments: target(object): The target whose PID(s) to find. Returns: A list of found PIDs. """ if isinstance(target, tubes.sock.sock): local = target.sock.getsockname() remote = target.sock.getpeername() def match(p): return (c.raddr, c.laddr, c.status) == (local, remote, 'ESTABLISHED') return [c.pid for c in psutil.net_connections() if match(c)] elif isinstance(target, tubes.process.process): return [target.proc.pid] else: return pid_by_name(target)
def get_procinfo_by_address(ip_src, ip_dst, port_src=None, port_dst=None): """ Gets Infos about the Prozess associated with the given address information Both port_src and port_dst must be not None or None at the same time. return -- [pid, "/path/to/command", "user", "hash"] or [] """ result = [] if port_src is not None: pids = [c.pid for c in net_connections() if len(c.raddr) != 0 and c.pid is not None and (c.laddr[0], c.raddr[0], c.laddr[1], c.raddr[1]) == (ip_src, ip_dst, port_src, port_dst)] else: pids = [c.pid for c in net_connections() if len(c.raddr) != 0 and c.pid is not None and (c.laddr[0], c.raddr[0]) == (ip_src, ip_dst)] try: if len(pids) > 1: logger.warning("more than 1 matching process: %r", pids) proc = Process(pids[0]) cmd = [pids[0], proc.cmdline(), proc.username()] hash_input = "%d%s%s" % (cmd[0], cmd[1], cmd[2]) procinfo_hash = hashlib.sha256(hash_input.encode("UTF-8")) cmd.append(procinfo_hash) logger.debug("process info: %r", cmd) return cmd except IndexError: pass return []
def get_listen_ports(self): """ ?????? :return: ?????? """ listen_ports = [] connections = psutil.net_connections() for connection in connections: if connection.status == "LISTEN": listen_ports.append((connection.laddr[1], connection.pid)) return listen_ports
def print_all_conns(): system_conns = psutil.net_connections() for conn in system_conns: print conn
def print_conns_by_proc_pid(pid): system_conns = psutil.net_connections() for conn in system_conns: if conn.pid == pid: print conn
def print_conns_by_remote_addr(addr): system_conns = psutil.net_connections() for conn in system_conns: if addr in conn.raddr: print conn
def kill_dnsmasq(config): # There can be more than one bound to it, especially if libvirt was used.. # Easy way: started from (previous) run of manifest_api. if config['PXE_INTERFACE'] is None: return try: with open(config['DNSMASQ_PIDFILE'], 'r') as f: pid = int(f.read()) kill_pid(pid, 'dnsmasq') except Exception: # a valiant effort in vain pass # Hard way: track down dnsmasq(s) attached to the configured interface. pxe_interface = config['PXE_INTERFACE'] if pxe_interface not in NIF.interfaces(): return tmp = NIF.ifaddresses(pxe_interface).get(NIF.AF_INET, None) if tmp is None: # ASS-U-MES "torms" address bound here return pxe_addr = tmp[0]['addr'] # libvirt bridge will always have DNS (53). A truly simple net definition # won't have TFTP (69) but others might. Obviously one started from here # will have TFTP. And /etc/init.d/dnsmasq starts a *.* DNS listener # which picks up PXE_INTERFACE when explicitly bound processes are killed. # net_connections returns an object with a laddr field that's a tuple # of (listenaddress, port). Filter on that. openconns = [(c.laddr[1], c.pid) # port, pid for c in psutil.net_connections(kind='inet4') if c.laddr[1] in (53, 69) and ( c.laddr[0] == pxe_addr or c.laddr[0] == '0.0.0.0' )] pids = set(c[1] for c in openconns) if pids: mainapp.logger.info('Killing %d copies of dnsmasq' % len(pids)) while len(pids): pid = pids.pop() kill_pid(pid, 'dnsmasq') # Until someone starts using bind9...
def get_pid_by_connection(src_addr, src_p, dst_addr, dst_p, proto='tcp'): # We always take the first element as we assume it contains only one # It should not be possible to keep two connections which are the same. for conn in psutil.net_connections(kind=proto): if proto == 'tcp': if conn.laddr != (src_addr, int(src_p)): continue if conn.raddr != (dst_addr, int(dst_p)): continue # UDP gives us a very limited dataset to work with elif proto == 'udp': if conn.laddr[1] != int(src_p): continue return conn.pid logging.warning("Could not find process for %s connection %s:%s -> %s:%s", proto, src_addr, src_p, dst_addr, dst_p) return None
def already_listening(port, renewer=False): """Check if a process is already listening on the port. If so, also tell the user via a display notification. .. warning:: On some operating systems, this function can only usefully be run as root. :param int port: The TCP port in question. :returns: True or False. """ try: net_connections = psutil.net_connections() except psutil.AccessDenied as error: logger.info("Access denied when trying to list network " "connections: %s. Are you root?", error) # this function is just a pre-check that often causes false # positives and problems in testing (c.f. #680 on Mac, #255 # generally); we will fail later in bind() anyway return False listeners = [conn.pid for conn in net_connections if conn.status == 'LISTEN' and conn.type == socket.SOCK_STREAM and conn.laddr[1] == port] try: if listeners and listeners[0] is not None: # conn.pid may be None if the current process doesn't have # permission to identify the listening process! Additionally, # listeners may have more than one element if separate # sockets have bound the same port on separate interfaces. # We currently only have UI to notify the user about one # of them at a time. pid = listeners[0] name = psutil.Process(pid).name() display = zope.component.getUtility(interfaces.IDisplay) extra = "" if renewer: extra = ( " For automated renewal, you may want to use a script that stops" " and starts your webserver. You can find an example at" " https://letsencrypt.org/howitworks/#writing-your-own-renewal-script" ". Alternatively you can use the webroot plugin to renew without" " needing to stop and start your webserver.") display.notification( "The program {0} (process ID {1}) is already listening " "on TCP port {2}. This will prevent us from binding to " "that port. Please stop the {0} program temporarily " "and then try again.{3}".format(name, pid, port, extra), height=13) return True except (psutil.NoSuchProcess, psutil.AccessDenied): # Perhaps the result of a race where the process could have # exited or relinquished the port (NoSuchProcess), or the result # of an OS policy where we're not allowed to look up the process # name (AccessDenied). pass return False
def _getDebugData(): try: connections = psutil.net_connections() # You need to be root for OSX except psutil.AccessDenied: connections = None try: addrs = ["* {}: {}".format(key, val) for key, val in psutil.net_if_addrs().items()] except UnicodeDecodeError: addrs = ["INVALID ADDR WITH UNICODE CHARACTERS"] data = """Version: {version} OS: {os} Python: {python} CPU: {cpu} Memory: {memory} Networks: {addrs} Open connections: {connections} Processus: """.format( version=__version__, os=platform.platform(), python=platform.python_version(), memory=psutil.virtual_memory(), cpu=psutil.cpu_times(), connections=connections, addrs="\n".join(addrs) ) for proc in psutil.process_iter(): try: psinfo = proc.as_dict(attrs=["name", "exe"]) data += "* {} {}\n".format(psinfo["name"], psinfo["exe"]) except psutil.NoSuchProcess: pass data += "\n\nProjects" for project in Controller.instance().projects.values(): data += "\n\nProject name: {}\nProject ID: {}\n".format(project.name, project.id) for link in project.links.values(): data += "Link {}: {}".format(link.id, link.debug_link_data) return data
def get_connections(self): listening = [] connections = psutil.net_connections() for conn in connections: if not conn.pid: continue if conn.status == psutil.CONN_LISTEN: ip, port = conn.laddr # instance = '%s:%s' % (ip, port) if port not in listening: listening.append(port) res = [] clients = {} for conn in connections: if not conn.pid: continue if not conn.status == psutil.CONN_ESTABLISHED: continue ip, port = conn.laddr # instance = '%s:%s' % (ip, port) if port in listening: continue try: proc = psutil.Process(conn.pid) except psutil.NoSuchProcess: continue current = time.time() create_time = proc.create_time() if current - create_time < 60 * 5: continue rip, rport = conn.raddr record_id = '%s-%s:%s' % (conn.pid, rip, rport) client = clients.setdefault(record_id, {}) if not client: client['client_pid'] = conn.pid client['client_process_create_time'] = int(create_time) client['client_pname'] = proc.name() client['client_cwd'] = proc.cwd() client['client_ip'] = ip # client['client_ports'] = '%s' % port client['client_port_num'] = 1 client['server_ip'] = rip client['server_port'] = rport client['server_pid'] = '' client['server_pname'] = '' else: # client['client_ports'] += ', %s' % port client['client_port_num'] += 1 res = clients.values() res.sort(key=lambda x: (x['client_pid'], x['server_port'])) return res