我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.net_io_counters()。
def test_procfs_path(self): tdir = tempfile.mkdtemp() try: psutil.PROCFS_PATH = tdir self.assertRaises(IOError, psutil.virtual_memory) self.assertRaises(IOError, psutil.cpu_times) self.assertRaises(IOError, psutil.cpu_times, percpu=True) self.assertRaises(IOError, psutil.boot_time) # self.assertRaises(IOError, psutil.pids) self.assertRaises(IOError, psutil.net_connections) self.assertRaises(IOError, psutil.net_io_counters) self.assertRaises(IOError, psutil.net_if_stats) self.assertRaises(IOError, psutil.disk_io_counters) self.assertRaises(IOError, psutil.disk_partitions) self.assertRaises(psutil.NoSuchProcess, psutil.Process) finally: psutil.PROCFS_PATH = "/proc" os.rmdir(tdir)
def get_network(): network = psutil.net_io_counters(pernic=True) ifaces = psutil.net_if_addrs() networks = list() for k, v in ifaces.items(): ip = v[0].address data = network[k] ifnet = dict() ifnet['ip'] = ip ifnet['iface'] = k ifnet['sent'] = '%.2fMB' % (data.bytes_sent/1024/1024) ifnet['recv'] = '%.2fMB' % (data.bytes_recv/1024/1024) ifnet['packets_sent'] = data.packets_sent ifnet['packets_recv'] = data.packets_recv ifnet['errin'] = data.errin ifnet['errout'] = data.errout ifnet['dropin'] = data.dropin ifnet['dropout'] = data.dropout networks.append(ifnet) return networks
def stream_host_stats(): while True: net = psutil.net_io_counters(pernic=True) time.sleep(1) net1 = psutil.net_io_counters(pernic=True) net_stat_download = {} net_stat_upload = {} for k, v in net.items(): for k1, v1 in net1.items(): if k1 == k: net_stat_download[k] = (v1.bytes_recv - v.bytes_recv) / 1000. net_stat_upload[k] = (v1.bytes_sent - v.bytes_sent) / 1000. ds = statvfs('/') disk_str = {"Used": ((ds.f_blocks - ds.f_bfree) * ds.f_frsize) / 10 ** 9, "Unused": (ds.f_bavail * ds.f_frsize) / 10 ** 9} yield '[{"cpu":"%s","memory":"%s","memTotal":"%s","net_stats_down":"%s","net_stats_up":"%s","disk":"%s"}],' \ % (psutil.cpu_percent(interval=1), psutil.virtual_memory().used, psutil.virtual_memory().free, \ net_stat_download, net_stat_upload, disk_str)
def get_network(self): usage = 0 current_network = psutil.net_io_counters()[0] if self._previous_network == 0: # Check, wait a second, then check again to get a base value. self._previous_network = current_network time.sleep(2) current_network = psutil.net_io_counters()[0] time_since = time.time() - self._last_check usage = (current_network - self._previous_network) self._previous_network = current_network self._last_check = time.time() return usage
def test_serialization(self): def check(ret): if json is not None: json.loads(json.dumps(ret)) a = pickle.dumps(ret) b = pickle.loads(a) self.assertEqual(ret, b) check(psutil.Process().as_dict()) check(psutil.virtual_memory()) check(psutil.swap_memory()) check(psutil.cpu_times()) check(psutil.cpu_times_percent(interval=0)) check(psutil.net_io_counters()) if LINUX and not os.path.exists('/proc/diskstats'): pass else: if not APPVEYOR: check(psutil.disk_io_counters()) check(psutil.disk_partitions()) check(psutil.disk_usage(os.getcwd())) check(psutil.users())
def update(self): """Function to update the entire class information.""" self.cpu["percentage"] = psutil.cpu_percent(interval=0.7) self.boot = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime( "%Y-%m-%d %H:%M:%S") virtual_memory = psutil.virtual_memory() self.memory["used"] = virtual_memory.used self.memory["free"] = virtual_memory.free self.memory["cached"] = virtual_memory.cached net_io_counters = psutil.net_io_counters() self.network["packet_sent"] = net_io_counters.packets_sent self.network["packet_recv"] = net_io_counters.packets_recv disk_usage = psutil.disk_usage('/') self.disk["total"] = int(disk_usage.total/1024) self.disk["used"] = int(disk_usage.used/1024) self.disk["free"] = int(disk_usage.free/1024) self.timestamp = time.time()
def test_nic_names(self): p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) output = p.communicate()[0].strip() if p.returncode != 0: raise unittest.SkipTest('ifconfig returned no output') if PY3: output = str(output, sys.stdout.encoding) for nic in psutil.net_io_counters(pernic=True).keys(): for line in output.split(): if line.startswith(nic): break else: self.fail( "couldn't find %s nic in 'ifconfig -a' output\n%s" % ( nic, output)) # can't find users on APPVEYOR or TRAVIS
def collect_net_stats(self): raw_stats = psutil.net_io_counters(pernic=True) for key in raw_stats.keys(): if re.match('[\d]+-[\d]+',key) is not None: if key not in self.net_stats.keys(): self.net_stats[key] = {} self.net_stats[key]['bytes_sent'] = 0 self.net_stats[key]['bytes_recv'] = 0 self.net_stats[key]['bytes_recv_per_sec'] = round((int(raw_stats[key].bytes_sent) - self.net_stats[key]['bytes_recv']) / self.interval) self.net_stats[key]['bytes_sent_per_sec'] = round((int(raw_stats[key].bytes_recv) - self.net_stats[key]['bytes_sent']) / self.interval) self.net_stats[key]['bytes_recv'] = int(raw_stats[key].bytes_sent) self.net_stats[key]['bytes_sent'] = int(raw_stats[key].bytes_recv) self.net_stats[key]['packets_recv'] = int(raw_stats[key].packets_sent) self.net_stats[key]['packets_sent'] = int(raw_stats[key].packets_recv) self.net_stats[key]['errin'] = int(raw_stats[key].errout) self.net_stats[key]['errout'] = int(raw_stats[key].errin) self.net_stats[key]['dropin'] = int(raw_stats[key].dropout) self.net_stats[key]['dropout'] = int(raw_stats[key].dropin) else: if key not in gateways_stats.keys(): gateways_stats[key] = {} gateways_stats[key]['bytes_recv'] = int(raw_stats[key].bytes_sent) gateways_stats[key]['bytes_sent'] = int(raw_stats[key].bytes_recv) gateways_stats[key]['bytes_total'] = gateways_stats[key]['bytes_recv'] + gateways_stats[key]['bytes_sent'] #logger.info(self.net_stats) # the main function to collect monitoring data of a container
def check_network(i_warning, i_critical): test_int(i_warning, i_critical) s_perfdata = '' s_output = '' i_max = 0 s_maxdesc = '' d_io_counters = psutil.net_io_counters(pernic=True) for s_device, nt_counters in d_io_counters.items(): d_counters = nt_counters._asdict() # add all io_counters to perfdata for key, value in d_counters.items(): if 'err' in key or 'drop' in key: if value > i_max: i_max = value s_maxdesc = '{} has {} {} packets.'.format(s_device, value, key) s_perfdata = add_perfdata(s_perfdata, s_device, key, value) s_output = check_status(i_warning, i_critical, i_max) if not 'OK' in s_output: s_output += s_maxdesc s_output += ' | {}'.format(s_perfdata) return s_output
def _get_bytes(interface): try: io_counters = psutil.net_io_counters(pernic=True) except AttributeError: io_counters = psutil.network_io_counters(pernic=True) if_io = io_counters.get(interface) if not if_io: return None return if_io.bytes_recv, if_io.bytes_sent
def _get_interfaces(): try: io_counters = psutil.net_io_counters(pernic=True) except AttributeError: io_counters = psutil.network_io_counters(pernic=True) for interface, data in io_counters.items(): if data: yield interface, data.bytes_recv, data.bytes_sent
def get_networks(self): return psutil.net_io_counters(pernic=True)
def get_data(self): """Gets system utilization stats.""" # Get system utilization stats. cpu_percent = psutil.cpu_percent(interval=1) memory_percent = psutil.virtual_memory().percent disk_percent = psutil.disk_usage('/').percent network_io = psutil.net_io_counters() # Compute deltas for sent and received bytes. Note this is system-wide # network usage and not necessarily GPRS-related. # TODO(matt): query on a specific interface..which one, I'm not sure. if self.last_bytes_sent == 0: bytes_sent_delta = 0 else: bytes_sent_delta = network_io.bytes_sent - self.last_bytes_sent self.last_bytes_sent = network_io.bytes_sent if self.last_bytes_received == 0: bytes_received_delta = 0 else: bytes_received_delta = ( network_io.bytes_recv - self.last_bytes_received) self.last_bytes_received = network_io.bytes_recv return { 'cpu_percent': cpu_percent, 'memory_percent': memory_percent, 'disk_percent': disk_percent, 'bytes_sent_delta': bytes_sent_delta, 'bytes_received_delta': bytes_received_delta, }
def network_recv(): return bytes2human(psutil.net_io_counters().bytes_recv)
def network_sent(): return bytes2human(psutil.net_io_counters().bytes_sent)
def _update_widgets(self, widgets): interfaces = [i for i in netifaces.interfaces() if not i.startswith(self._exclude)] del widgets[:] counters = psutil.net_io_counters(pernic=True) for interface in interfaces: if not interface: interface = "lo" state = "down" if len(self.get_addresses(interface)) > 0: state = "up" if len(self._states["exclude"]) > 0 and state in self._states["exclude"]: continue if len(self._states["include"]) > 0 and state not in self._states["include"]: continue data = { "rx": counters[interface].bytes_recv, "tx": counters[interface].bytes_sent, } name = "traffic-{}".format(interface) if self._showname: self.create_widget(widgets, name, interface) for direction in ["rx", "tx"]: name = "traffic.{}-{}".format(direction, interface) widget = self.create_widget(widgets, name, attributes={"theme.minwidth": "1000.00MB"}) prev = self._prev.get(name, 0) speed = bumblebee.util.bytefmt(int(data[direction]) - int(prev)) widget.full_text(speed) self._prev[name] = data[direction] # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
def network(self, all_interface=False): """ all_interface: if true, shows all interface network statistics Return: dict """ if all_interface: stats = psutil.net_io_counters(pernic=True) else: stats = psutil.net_io_counters() if all_interface: n = {} for k, v in stats.items(): n[k] = { "bytes_send": self.hr(v.bytes_sent), "bytes_recv": self.hr(v.bytes_recv), "packets_sent": self.hr(v.packets_sent), "packets_recv": self.hr(v.packets_recv), "errin": v.errin, "errorout": v.errout, "dropin": v.dropin, "dropout": v.dropout } else: n = { "bytes_sent": self.hr(stats.bytes_sent), "bytes_recv": self.hr(stats.bytes_recv), "packets_sent": self.hr(stats.packets_sent), "packets_recv": self.hr(stats.packets_recv), "errin": stats.errin, "errorout": stats.errout, "dropin": stats.dropin, "dropout": stats.dropout } return n
def get_stats(self): cpct = psutil.cpu_percent(interval=0) ctimes = psutil.cpu_times_percent() self.cpu_stats = CpuStats(cpct, ctimes.user, ctimes.system, ctimes.idle) self.vmem_stats = psutil.virtual_memory() self.disk_stats = psutil.disk_io_counters() self.net_stats = psutil.net_io_counters() # must create new stats list each time stats are updated # because named tuples are immutable self.statslist = [self.cpu_stats, self.vmem_stats, self.disk_stats, self.net_stats]
def test_start(self): process = psutil.Process() self.run_data['start'] = True self.run_data['test_status'] = 'running' self.run_data['stats'] = {'net:start': json.dumps(psutil.net_io_counters()), 'cpu:start': json.dumps(process.cpu_times()), 'mem:start': json.dumps(process.memory_info()), 'time:start': json.dumps(time.time())} return ('ok', None)
def get_stats(self): process = psutil.Process() self.run_data['stats']['msg_cnt'] = self.msg_cnt self.run_data['stats']['net:end'] = json.dumps(psutil.net_io_counters()) self.run_data['stats']['cpu:end'] = json.dumps(process.cpu_times()) self.run_data['stats']['mem:end'] = json.dumps(process.memory_info()) self.run_data['stats']['reconnect_cnt'] = self.reconnect_cnt self.run_data['stats']['rate'] = self.run_data['stats']['msg_cnt'] / ( self.run_data['last_msg_time_r'] - self.run_data['first_msg_time_r']) return ('ok', self.run_data['stats'])
def reset_stats(self): l.info("RESETTING SUB STATS") process = psutil.Process() self.run_data = {'stats': {}} self.run_data['stats'] = {'msg_cnt': 0, 'first_msg_time': 0, 'last_msg_time': 0} self.run_data['stats']['net:start'] = json.dumps(psutil.net_io_counters()) self.run_data['stats']['cpu:start'] = json.dumps(process.cpu_times()) self.run_data['stats']['mem:start'] = json.dumps(process.memory_info()) self.run_data['first_msg_time_r'] = 0 self.run_data['last_msg_time_r'] = 1 self.msg_cnt = 0 self.reconnect_cnt = 0 return ('ok', 'stats reset')
def get_stats(self): process = psutil.Process() self.run_data['stats']['net']['end'] = psutil.net_io_counters() self.run_data['stats']['cpu']['end'] = process.cpu_times() self.run_data['stats']['mem']['end'] = process.memory_info() duration = self.run_data['last_msg_time'] - self.run_data['first_msg_time'] if duration == 0: self.run_data['rate'] = 0 else: self.run_data['rate'] = self.run_data['msg_cnt'] / duration return ('ok', self.run_data)
def reset_stats(self): l.info("RESETTING SUB STATS") process = psutil.Process() self.run_data = {'msg_cnt': 0, 'first_msg_time': 0, 'last_msg_time': 0, 'stats': {}} self.run_data['stats']['net'] = {'start': psutil.net_io_counters()} self.run_data['stats']['cpu'] = {'start': process.cpu_times()} self.run_data['stats']['mem'] = {'start': process.memory_info()} self.msg_cnt = 0 return ('ok', 'stats reset')
def get_stats(self): process = psutil.Process() self.run_data['stats']['msg_cnt'] = self.msg_cnt self.run_data['stats']['net:end'] = json.dumps(psutil.net_io_counters()) self.run_data['stats']['cpu:end'] = json.dumps(process.cpu_times()) self.run_data['stats']['mem:end'] = json.dumps(process.memory_info()) self.run_data['stats']['rate'] = self.run_data['stats']['msg_cnt'] / ( self.run_data['last_msg_time_r'] - self.run_data['first_msg_time_r']) return ('ok', self.run_data['stats'])
def reset_stats(self): l.info("RESETTING SUB STATS") process = psutil.Process() self.run_data = {'msg_cnt': 0, 'first_msg_time': 0, 'last_msg_time': 0, 'stats': {}} self.run_data['stats']['net'] = {'start': psutil.net_io_counters()} self.run_data['stats']['cpu'] = {'start': process.cpu_times()} self.run_data['stats']['mem'] = {'start': process.memory_info()} self.run_data['first_msg_time_r'] = 0 self.run_data['last_msg_time_r'] = 1 self.msg_cnt = 0 return ('ok', 'stats reset')
def test_net_io_counters(self): self.execute(psutil.net_io_counters)
def test_net_io_counters(self): def ifconfig(nic): ret = {} out = sh("ifconfig %s" % name) ret['packets_recv'] = int(re.findall('RX packets:(\d+)', out)[0]) ret['packets_sent'] = int(re.findall('TX packets:(\d+)', out)[0]) ret['errin'] = int(re.findall('errors:(\d+)', out)[0]) ret['errout'] = int(re.findall('errors:(\d+)', out)[1]) ret['dropin'] = int(re.findall('dropped:(\d+)', out)[0]) ret['dropout'] = int(re.findall('dropped:(\d+)', out)[1]) ret['bytes_recv'] = int(re.findall('RX bytes:(\d+)', out)[0]) ret['bytes_sent'] = int(re.findall('TX bytes:(\d+)', out)[0]) return ret for name, stats in psutil.net_io_counters(pernic=True).items(): try: ifconfig_ret = ifconfig(name) except RuntimeError: continue self.assertAlmostEqual( stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5) self.assertAlmostEqual( stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5) self.assertAlmostEqual( stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024) self.assertAlmostEqual( stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024) self.assertAlmostEqual( stats.errin, ifconfig_ret['errin'], delta=10) self.assertAlmostEqual( stats.errout, ifconfig_ret['errout'], delta=10) self.assertAlmostEqual( stats.dropin, ifconfig_ret['dropin'], delta=10) self.assertAlmostEqual( stats.dropout, ifconfig_ret['dropout'], delta=10)
def test_nic_names(self): p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) out = p.communicate()[0] if PY3: out = str(out, sys.stdout.encoding or sys.getfilesystemencoding()) nics = psutil.net_io_counters(pernic=True).keys() for nic in nics: if "pseudo-interface" in nic.replace(' ', '-').lower(): continue if nic not in out: self.fail( "%r nic wasn't found in 'ipconfig /all' output" % nic)
def test_nic_names(self): p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) output = p.communicate()[0].strip() if p.returncode != 0: raise unittest.SkipTest('ifconfig returned no output') if PY3: output = str(output, sys.stdout.encoding) for nic in psutil.net_io_counters(pernic=True).keys(): for line in output.split(): if line.startswith(nic): break else: self.fail( "couldn't find %s nic in 'ifconfig -a' output\n%s" % ( nic, output))
def host_traffic_performance_report(self): data = list() net_io = psutil.net_io_counters(pernic=True) for nic_name in self.interfaces.keys(): nic = net_io.get(nic_name, None) if nic is None: continue traffic = list() if nic_name in self.last_host_traffic: traffic = { 'node_id': self.node_id, 'name': nic_name, 'rx_bytes': (nic.bytes_recv - self.last_host_traffic[nic_name].bytes_recv) / self.interval, 'rx_packets': (nic.packets_recv - self.last_host_traffic[nic_name].packets_recv) / self.interval, 'rx_errs': (nic.errin - self.last_host_traffic[nic_name].errin), 'rx_drop': (nic.dropin - self.last_host_traffic[nic_name].dropin), 'tx_bytes': (nic.bytes_sent - self.last_host_traffic[nic_name].bytes_sent) / self.interval, 'tx_packets': (nic.packets_sent - self.last_host_traffic[nic_name].packets_sent) / self.interval, 'tx_errs': (nic.errout - self.last_host_traffic[nic_name].errout), 'tx_drop': (nic.dropout - self.last_host_traffic[nic_name].dropout) } elif not isinstance(self.last_host_disk_io, dict): self.last_host_traffic = dict() self.last_host_traffic[nic_name] = nic if traffic.__len__() > 0: data.append(traffic) if data.__len__() > 0: host_collection_performance_emit.traffic(data=data)
def interface_size(): return len(psutil.net_io_counters(pernic=True))
def io_counter_detail(): return psutil.net_io_counters(pernic=True)
def interfaces(): arr = [] for iface in psutil.net_io_counters(pernic=True): arr.append(str(iface)) return arr
def mac_addresses(): mac = get_mac() ':'.join(("%012X" % mac)[i:i + 2] for i in range(0, 12, 2)) arr = [] for iface in psutil.net_io_counters(pernic=True): try: addr_list = psutil.net_if_addrs() mac = addr_list[str(iface)][2][1] if re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", mac.lower()) and str( mac) != '00:00:00:00:00:00': arr.append(mac.lower()) except Exception as e: pass return arr
def ip_addresses(): arr = [] for iface in psutil.net_io_counters(pernic=True): ip = psutil.net_if_addrs()[str(iface)][0][1] if re.match(r'^((\d{1,2}|1\d{2}|2[0-4]\d|25[0-5])\.){3}(\d{1,2}|1\d{2}|2[0-4]\d|25[0-5])$', ip) and str( ip) != 'localhost' and str(ip) != '127.0.0.1': arr.append(ip) return arr
def find_default_interface(self): """Look through the list of interfaces for the non-loopback interface""" import psutil try: if self.interfaces is None: self.interfaces = {} # Look to see which interfaces are up stats = psutil.net_if_stats() for interface in stats: if interface != 'lo' and interface[:3] != 'ifb' and stats[interface].isup: self.interfaces[interface] = {'packets': 0} if len(self.interfaces) > 1: # See which interfaces have received data cnt = psutil.net_io_counters(True) for interface in cnt: if interface in self.interfaces: self.interfaces[interface]['packets'] = \ cnt[interface].packets_sent + cnt[interface].packets_recv remove = [] for interface in self.interfaces: if self.interfaces[interface]['packets'] == 0: remove.append(interface) if len(remove): for interface in remove: del self.interfaces[interface] if len(self.interfaces) > 1: # Eliminate any with the loopback address remove = [] addresses = psutil.net_if_addrs() for interface in addresses: if interface in self.interfaces: for address in addresses[interface]: if address.address == '127.0.0.1': remove.append(interface) break if len(remove): for interface in remove: del self.interfaces[interface] except Exception: pass
def get_net_bytes(self): """Get the bytes received, ignoring the loopback interface""" import psutil bytes_in = 0 net = psutil.net_io_counters(True) for interface in net: if self.interfaces is not None: if interface in self.interfaces: bytes_in += net[interface].bytes_recv elif interface != 'lo' and interface[:3] != 'ifb': bytes_in += net[interface].bytes_recv return bytes_in
def run(self, *unused): return psutil.net_io_counters(pernic=True)
def net_util_value(): first_time_stamp = psutil.net_io_counters( pernic=False).bytes_sent + psutil.net_io_counters(pernic=False).bytes_recv time.sleep(5) second_time_stamp = psutil.net_io_counters( pernic=False).bytes_sent + psutil.net_io_counters(pernic=False).bytes_recv return second_time_stamp - first_time_stamp
def network(iface): stat = psutil.net_io_counters(pernic=True)[iface] return "%s: Tx%s, Rx%s" % \ (iface, bytes2human(stat.bytes_sent), bytes2human(stat.bytes_recv))
def network_metrics(): """Get the current upload, in bytes, since last boot.""" psinfo = psutil.net_io_counters(pernic=True) return { "{}_{}".format(iface, way): ifinfo[n] for iface, ifinfo in psinfo.items() for way, n in {"up": 0, "down": 1}.items() }
def network(): c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.network') t0 = time.time() counters = psutil.net_io_counters(pernic=True) last_totals = dict() totals = dict() interfaces = set([key for key in counters.keys() if key != 'lo']) for interface in interfaces: totals[interface] = (counters[interface].bytes_sent, counters[interface].bytes_recv) last_totals[interface] = (counters[interface].bytes_sent, counters[interface].bytes_recv) while True: for interface in interfaces: counter = psutil.net_io_counters(pernic=True)[interface] t1 = time.time() totals[interface] = (counter.bytes_sent, counter.bytes_recv) ul, dl = [(now - last) / (t1 - t0) / 1000.0 for now, last in zip(totals[interface], last_totals[interface])] t0 = time.time() c.gauge('%s.upload.kbps' % interface, ul) c.gauge('%s.download.kbps' % interface, dl) last_totals[interface] = totals[interface] time.sleep(GRANULARITY)
def functionDataSensor(): netdata = psutil.net_io_counters() data = netdata.packets_sent + netdata.packets_recv return data
def poll(interval): """ Retrieve raw stats within an interval window. """ tot_before = psutil.net_io_counters() pnic_before = psutil.net_io_counters(pernic=True) # sleep some time time.sleep(interval) tot_after = psutil.net_io_counters() pnic_after = psutil.net_io_counters(pernic=True) return (tot_before, tot_after, pnic_before, pnic_after)