我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用psutil.net_if_stats()。
def test_procfs_path(self): tdir = tempfile.mkdtemp() try: psutil.PROCFS_PATH = tdir self.assertRaises(IOError, psutil.virtual_memory) self.assertRaises(IOError, psutil.cpu_times) self.assertRaises(IOError, psutil.cpu_times, percpu=True) self.assertRaises(IOError, psutil.boot_time) # self.assertRaises(IOError, psutil.pids) self.assertRaises(IOError, psutil.net_connections) self.assertRaises(IOError, psutil.net_io_counters) self.assertRaises(IOError, psutil.net_if_stats) self.assertRaises(IOError, psutil.disk_io_counters) self.assertRaises(IOError, psutil.disk_partitions) self.assertRaises(psutil.NoSuchProcess, psutil.Process) finally: psutil.PROCFS_PATH = "/proc" os.rmdir(tdir)
def get_active_nic_name(): '''?????????????''' all_nic=psutil.net_if_stats()#?????????? nic_names=all_nic.keys() try: all_nic.pop('docker0') except: pass try: all_nic.pop('lo') except: pass return all_nic pass
def collect_system_information(self): values = {} mem = psutil.virtual_memory() values['memory_total'] = mem.total import cpuinfo cpu = cpuinfo.get_cpu_info() values['resources_limit'] = self.resources_limit values['cpu_name'] = cpu['brand'] values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']] values['nets'] = {} values['disks'] = {} values['gpus'] = {} values['boot_time'] = psutil.boot_time() try: for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()): gpu['available'] = gpu_id in self.enabled_gpus values['gpus'][gpu_id] = gpu except Exception: pass for disk in psutil.disk_partitions(): try: name = self.get_disk_name(disk[1]) values['disks'][name] = psutil.disk_usage(disk[1]).total except Exception: # suppress Operation not permitted pass try: for id, net in psutil.net_if_stats().items(): if 0 != id.find('lo') and net.isup: self.nets.append(id) values['nets'][id] = net.speed or 1000 except Exception: # suppress Operation not permitted pass return values
def test_net_if_stats(self): self.execute(psutil.net_if_stats) # --- others
def test_net_if_stats(self): for name, stats in psutil.net_if_stats().items(): try: out = sh("ifconfig %s" % name) except RuntimeError: pass else: self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) self.assertEqual(stats.mtu, int(re.findall('MTU:(\d+)', out)[0]))
def test_net_if_stats(self): for name, stats in psutil.net_if_stats().items(): try: out = sh("ifconfig %s" % name) except RuntimeError: pass else: self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) self.assertEqual(stats.mtu, int(re.findall('mtu (\d+)', out)[0]))
def test_net_if_stats(self): for name, stats in psutil.net_if_stats().items(): try: out = sh("ifconfig %s" % name) except RuntimeError: pass else: self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) self.assertEqual(stats.mtu, int(re.findall('mtu (\d+)', out)[0])) # ===================================================================== # --- FreeBSD # =====================================================================
def find_default_interface(self): """Look through the list of interfaces for the non-loopback interface""" import psutil try: if self.interfaces is None: self.interfaces = {} # Look to see which interfaces are up stats = psutil.net_if_stats() for interface in stats: if interface != 'lo' and interface[:3] != 'ifb' and stats[interface].isup: self.interfaces[interface] = {'packets': 0} if len(self.interfaces) > 1: # See which interfaces have received data cnt = psutil.net_io_counters(True) for interface in cnt: if interface in self.interfaces: self.interfaces[interface]['packets'] = \ cnt[interface].packets_sent + cnt[interface].packets_recv remove = [] for interface in self.interfaces: if self.interfaces[interface]['packets'] == 0: remove.append(interface) if len(remove): for interface in remove: del self.interfaces[interface] if len(self.interfaces) > 1: # Eliminate any with the loopback address remove = [] addresses = psutil.net_if_addrs() for interface in addresses: if interface in self.interfaces: for address in addresses[interface]: if address.address == '127.0.0.1': remove.append(interface) break if len(remove): for interface in remove: del self.interfaces[interface] except Exception: pass
def test_net_if_stats(self): self.execute(psutil.net_if_stats) # --- sensors
def test_net_if_stats(self): for name, stats in psutil.net_if_stats().items(): try: out = sh("ifconfig %s" % name) except RuntimeError: pass else: # Not always reliable. # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) self.assertEqual(stats.mtu, int(re.findall('MTU:(\d+)', out)[0]))
def main(): stats = psutil.net_if_stats() io_counters = psutil.net_io_counters(pernic=True) for nic, addrs in psutil.net_if_addrs().items(): print("%s:" % (nic)) if nic in stats: st = stats[nic] print(" stats : ", end='') print("speed=%sMB, duplex=%s, mtu=%s, up=%s" % ( st.speed, duplex_map[st.duplex], st.mtu, "yes" if st.isup else "no")) if nic in io_counters: io = io_counters[nic] print(" incoming : ", end='') print("bytes=%s, pkts=%s, errs=%s, drops=%s" % ( io.bytes_recv, io.packets_recv, io.errin, io.dropin)) print(" outgoing : ", end='') print("bytes=%s, pkts=%s, errs=%s, drops=%s" % ( io.bytes_sent, io.packets_sent, io.errout, io.dropout)) for addr in addrs: print(" %-4s" % af_map.get(addr.family, addr.family), end="") print(" address : %s" % addr.address) if addr.broadcast: print(" broadcast : %s" % addr.broadcast) if addr.netmask: print(" netmask : %s" % addr.netmask) if addr.ptp: print(" p2p : %s" % addr.ptp) print("")
def _get_cali_veth_stat(self): ''' Check the status of all network interfaces. Value is 1 if any one of them is DOWN ''' cali_veth_up = 0 cali_veth_down = 0 cali_veth_total = 0 tmp_veth_up = 0 tmp_veth_down = 0 tmp_veth_total = 0 for name, stat in psutil.net_if_stats().iteritems(): if name.startswith('cali'): cali_veth_total += 1 if stat.isup: cali_veth_up += 1 else: cali_veth_down += 1 elif name.startswith('tmp'): tmp_veth_total += 1 if stat.isup: tmp_veth_up += 1 else: tmp_veth_down += 1 self._result.append( GraphiteData("lain.cluster.calico.veth.cali.up", self._endpoint, cali_veth_up, self._step, "val")) self._result.append( GraphiteData("lain.cluster.calico.veth.cali.down", self._endpoint, cali_veth_down, self._step, "val")) self._result.append( GraphiteData("lain.cluster.calico.veth.cali.total", self._endpoint, cali_veth_total, self._step, "val")) self._result.append( GraphiteData("lain.cluster.calico.veth.tmp.up", self._endpoint, tmp_veth_up, self._step, "val")) self._result.append( GraphiteData("lain.cluster.calico.veth.tmp.down", self._endpoint, tmp_veth_down, self._step, "val")) self._result.append( GraphiteData("lain.cluster.calico.veth.tmp.total", self._endpoint, tmp_veth_total, self._step, "val"))
def get_eth_info(self): """ ????? 1. ??bond???bond0 2. ???????eth0:1 3. ?ip???eth0(1.1.1.1,2.2.2.2) """ data = [] addrs = psutil.net_if_addrs() stats = psutil.net_if_stats() for name, entries in addrs.iteritems(): eth = { 'name': name, 'mac': '00:00:00:00:00:00', 'ip': '', 'mask': '', 'broadcast': '', 'status': 'Unknown', 'speed': 0 } try: # windows??? eth['name'] = name.decode('gbk') except: eth['name'] = name if name in stats: eth['status'] = 'Active' if int(stats[name].isup) else 'Inactive' eth['speed'] = stats[name].speed for entry in entries: if entry.family == psutil.AF_LINK: eth['mac'] = entry.address.upper() elif entry.family == socket.AF_INET: if eth['ip']:# ??????ip???????????? data.append(copy.deepcopy(eth)) eth['ip'] = entry.address eth['mask'] = entry.netmask eth['broadcast'] = entry.broadcast else: # ????IPV6 continue data.append(eth) # ???????????????mac?????00:00:00:00:00:00????????????":" for item in data: # ?????? if ':' not in item['name']: continue # ???mac?????????????????????":"???? if item['mac'] != '00:00:00:00:00:00': continue name = item['name'].rsplit(':', 1)[0] for iitem in data: if name == iitem['name']: item['mac'] = iitem['mac'] break data.sort(key=lambda x: x['name']) return data
def test_net_if_addrs(self): nics = psutil.net_if_addrs() assert nics, nics nic_stats = psutil.net_if_stats() # Not reliable on all platforms (net_if_addrs() reports more # interfaces). # self.assertEqual(sorted(nics.keys()), # sorted(psutil.net_io_counters(pernic=True).keys())) families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) for nic, addrs in nics.items(): self.assertIsInstance(nic, (str, unicode)) self.assertEqual(len(set(addrs)), len(addrs)) for addr in addrs: self.assertIsInstance(addr.family, int) self.assertIsInstance(addr.address, str) self.assertIsInstance(addr.netmask, (str, type(None))) self.assertIsInstance(addr.broadcast, (str, type(None))) self.assertIn(addr.family, families) if sys.version_info >= (3, 4): self.assertIsInstance(addr.family, enum.IntEnum) if nic_stats[nic].isup: # Do not test binding to addresses of interfaces # that are down if addr.family == socket.AF_INET: s = socket.socket(addr.family) with contextlib.closing(s): s.bind((addr.address, 0)) elif addr.family == socket.AF_INET6: info = socket.getaddrinfo( addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0] af, socktype, proto, canonname, sa = info s = socket.socket(af, socktype, proto) with contextlib.closing(s): s.bind(sa) for ip in (addr.address, addr.netmask, addr.broadcast, addr.ptp): if ip is not None: # TODO: skip AF_INET6 for now because I get: # AddressValueError: Only hex digits permitted in # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' if addr.family != AF_INET6: check_net_address(ip, addr.family) # broadcast and ptp addresses are mutually exclusive if addr.broadcast: self.assertIsNone(addr.ptp) elif addr.ptp: self.assertIsNone(addr.broadcast) if BSD or OSX or SUNOS: if hasattr(socket, "AF_LINK"): self.assertEqual(psutil.AF_LINK, socket.AF_LINK) elif LINUX: self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) elif WINDOWS: self.assertEqual(psutil.AF_LINK, -1)
def test_net_if_addrs(self): nics = psutil.net_if_addrs() assert nics, nics nic_stats = psutil.net_if_stats() # Not reliable on all platforms (net_if_addrs() reports more # interfaces). # self.assertEqual(sorted(nics.keys()), # sorted(psutil.net_io_counters(pernic=True).keys())) families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) for nic, addrs in nics.items(): self.assertEqual(len(set(addrs)), len(addrs)) for addr in addrs: self.assertIsInstance(addr.family, int) self.assertIsInstance(addr.address, str) self.assertIsInstance(addr.netmask, (str, type(None))) self.assertIsInstance(addr.broadcast, (str, type(None))) self.assertIn(addr.family, families) if sys.version_info >= (3, 4): self.assertIsInstance(addr.family, enum.IntEnum) if nic_stats[nic].isup: # Do not test binding to addresses of interfaces # that are down if addr.family == socket.AF_INET: s = socket.socket(addr.family) with contextlib.closing(s): s.bind((addr.address, 0)) elif addr.family == socket.AF_INET6: info = socket.getaddrinfo( addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0] af, socktype, proto, canonname, sa = info s = socket.socket(af, socktype, proto) with contextlib.closing(s): s.bind(sa) for ip in (addr.address, addr.netmask, addr.broadcast, addr.ptp): if ip is not None: # TODO: skip AF_INET6 for now because I get: # AddressValueError: Only hex digits permitted in # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' if addr.family != AF_INET6: check_net_address(ip, addr.family) # broadcast and ptp addresses are mutually exclusive if addr.broadcast: self.assertIsNone(addr.ptp) elif addr.ptp: self.assertIsNone(addr.broadcast) if BSD or OSX or SUNOS: if hasattr(socket, "AF_LINK"): self.assertEqual(psutil.AF_LINK, socket.AF_LINK) elif LINUX: self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) elif WINDOWS: self.assertEqual(psutil.AF_LINK, -1)