我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用psutil.disk_io_counters()。
def test_procfs_path(self): tdir = tempfile.mkdtemp() try: psutil.PROCFS_PATH = tdir self.assertRaises(IOError, psutil.virtual_memory) self.assertRaises(IOError, psutil.cpu_times) self.assertRaises(IOError, psutil.cpu_times, percpu=True) self.assertRaises(IOError, psutil.boot_time) # self.assertRaises(IOError, psutil.pids) self.assertRaises(IOError, psutil.net_connections) self.assertRaises(IOError, psutil.net_io_counters) self.assertRaises(IOError, psutil.net_if_stats) self.assertRaises(IOError, psutil.disk_io_counters) self.assertRaises(IOError, psutil.disk_partitions) self.assertRaises(psutil.NoSuchProcess, psutil.Process) finally: psutil.PROCFS_PATH = "/proc" os.rmdir(tdir)
def monitor(frist_invoke=2): """ Return (inbytes, outbytes, in_num, out_num, ioms) of disk. """ sdiskio = psutil.disk_io_counters() # sleep some time value_dic = { 'iostats': { 'io.disks_read': sdiskio.read_bytes/(1024*1024), 'io.disks_write': sdiskio.write_bytes/(1024*1024), 'io.disks_read_count': sdiskio.read_count/(1024 * 1024), 'io.disks_write_count': sdiskio.write_count/(1024 * 1024), 'io.disks_read_time': sdiskio.read_time/1000, 'io.disks_write_time': sdiskio.write_time/1000, 'io.disks_busy_time': sdiskio.write_time/1000, } } return value_dic
def test_serialization(self): def check(ret): if json is not None: json.loads(json.dumps(ret)) a = pickle.dumps(ret) b = pickle.loads(a) self.assertEqual(ret, b) check(psutil.Process().as_dict()) check(psutil.virtual_memory()) check(psutil.swap_memory()) check(psutil.cpu_times()) check(psutil.cpu_times_percent(interval=0)) check(psutil.net_io_counters()) if LINUX and not os.path.exists('/proc/diskstats'): pass else: if not APPVEYOR: check(psutil.disk_io_counters()) check(psutil.disk_partitions()) check(psutil.disk_usage(os.getcwd())) check(psutil.users())
def get_disk_io_info(self): returnData = {'readiokps': {}, 'writeiokps': {}} try: old_info = psutil.disk_io_counters(perdisk=True) time.sleep(1) new_info = psutil.disk_io_counters(perdisk=True) for (diskname, rwinfo) in old_info.items(): oldr, oldw = rwinfo.read_bytes, rwinfo.write_bytes newr, neww = new_info[diskname].read_bytes, new_info[ diskname].write_bytes riok = (newr - oldr) / 1024.0 wiok = (neww - oldw) / 1024.0 returnData['readiokps'][diskname] = riok returnData['writeiokps'][diskname] = wiok except Exception: pybixlib.error(self.logHead + traceback.format_exc()) self.errorInfoDone(traceback.format_exc()) return returnData
def get_metrics(self): self.initial_io_stats = psutil.disk_io_counters(perdisk=True) curr_host_name = socket.gethostbyname( self.CONFIG['peer_name'] ) time.sleep(self.STAT_INTERVAL_FOR_PER_SEC_COUNTER) self.current_io_stats = psutil.disk_io_counters(perdisk=True) threads = [] for volume in self.CLUSTER_TOPOLOGY.get('volumes', []): for sub_volume_index, sub_volume_bricks in volume.get( 'bricks', [] ).iteritems(): for brick in sub_volume_bricks: brick_hostname = brick['hostname'] if ( brick_hostname == curr_host_name or brick_hostname == self.CONFIG['peer_name'] ): thread = threading.Thread( target=self.populate_disk_details, args=( volume['name'], brick['hostname'], brick['path'], ) ) thread.start() threads.append( thread ) for thread in threads: thread.join(1) for thread in threads: del thread return self.brick_details
def disk_read(): return bytes2human(psutil.disk_io_counters().read_bytes)
def disk_written(): return bytes2human(psutil.disk_io_counters().write_bytes)
def get_stats(self): cpct = psutil.cpu_percent(interval=0) ctimes = psutil.cpu_times_percent() self.cpu_stats = CpuStats(cpct, ctimes.user, ctimes.system, ctimes.idle) self.vmem_stats = psutil.virtual_memory() self.disk_stats = psutil.disk_io_counters() self.net_stats = psutil.net_io_counters() # must create new stats list each time stats are updated # because named tuples are immutable self.statslist = [self.cpu_stats, self.vmem_stats, self.disk_stats, self.net_stats]
def test_disk_io_counters(self): self.execute(psutil.disk_io_counters) # --- net
def test_disk_io_counters_kernel_2_4_mocked(self): # Tests /proc/diskstats parsing format for 2.4 kernels, see: # https://github.com/giampaolo/psutil/issues/767 def open_mock(name, *args, **kwargs): if name == '/proc/partitions': return io.StringIO(textwrap.dedent(u"""\ major minor #blocks name 8 0 488386584 hda """)) elif name == '/proc/diskstats': return io.StringIO( u(" 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12")) else: return orig_open(name, *args, **kwargs) orig_open = open patch_point = 'builtins.open' if PY3 else '__builtin__.open' with mock.patch(patch_point, side_effect=open_mock) as m: ret = psutil.disk_io_counters() assert m.called self.assertEqual(ret.read_count, 1) self.assertEqual(ret.read_merged_count, 2) self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) self.assertEqual(ret.read_time, 4) self.assertEqual(ret.write_count, 5) self.assertEqual(ret.write_merged_count, 6) self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) self.assertEqual(ret.write_time, 8) self.assertEqual(ret.busy_time, 10)
def test_disk_io_counters_kernel_2_6_full_mocked(self): # Tests /proc/diskstats parsing format for 2.6 kernels, # lines reporting all metrics: # https://github.com/giampaolo/psutil/issues/767 def open_mock(name, *args, **kwargs): if name == '/proc/partitions': return io.StringIO(textwrap.dedent(u"""\ major minor #blocks name 8 0 488386584 hda """)) elif name == '/proc/diskstats': return io.StringIO( u(" 3 0 hda 1 2 3 4 5 6 7 8 9 10 11")) else: return orig_open(name, *args, **kwargs) orig_open = open patch_point = 'builtins.open' if PY3 else '__builtin__.open' with mock.patch(patch_point, side_effect=open_mock) as m: ret = psutil.disk_io_counters() assert m.called self.assertEqual(ret.read_count, 1) self.assertEqual(ret.read_merged_count, 2) self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) self.assertEqual(ret.read_time, 4) self.assertEqual(ret.write_count, 5) self.assertEqual(ret.write_merged_count, 6) self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) self.assertEqual(ret.write_time, 8) self.assertEqual(ret.busy_time, 10)
def test_disk_io_counters_kernel_2_6_limited_mocked(self): # Tests /proc/diskstats parsing format for 2.6 kernels, # where one line of /proc/partitions return a limited # amount of metrics when it bumps into a partition # (instead of a disk). See: # https://github.com/giampaolo/psutil/issues/767 def open_mock(name, *args, **kwargs): if name == '/proc/partitions': return io.StringIO(textwrap.dedent(u"""\ major minor #blocks name 8 0 488386584 hda """)) elif name == '/proc/diskstats': return io.StringIO( u(" 3 1 hda 1 2 3 4")) else: return orig_open(name, *args, **kwargs) orig_open = open patch_point = 'builtins.open' if PY3 else '__builtin__.open' with mock.patch(patch_point, side_effect=open_mock) as m: ret = psutil.disk_io_counters() assert m.called self.assertEqual(ret.read_count, 1) self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE) self.assertEqual(ret.write_count, 3) self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE) self.assertEqual(ret.read_merged_count, 0) self.assertEqual(ret.read_time, 0) self.assertEqual(ret.write_merged_count, 0) self.assertEqual(ret.write_time, 0) self.assertEqual(ret.busy_time, 0) # ===================================================================== # misc # =====================================================================
def test_disk_io_counters(self): def check_ntuple(nt): self.assertEqual(nt[0], nt.read_count) self.assertEqual(nt[1], nt.write_count) self.assertEqual(nt[2], nt.read_bytes) self.assertEqual(nt[3], nt.write_bytes) if not (OPENBSD or NETBSD): self.assertEqual(nt[4], nt.read_time) self.assertEqual(nt[5], nt.write_time) if LINUX: self.assertEqual(nt[6], nt.read_merged_count) self.assertEqual(nt[7], nt.write_merged_count) self.assertEqual(nt[8], nt.busy_time) elif FREEBSD: self.assertEqual(nt[6], nt.busy_time) for name in nt._fields: assert getattr(nt, name) >= 0, nt ret = psutil.disk_io_counters(perdisk=False) check_ntuple(ret) ret = psutil.disk_io_counters(perdisk=True) # make sure there are no duplicates self.assertEqual(len(ret), len(set(ret))) for key in ret: assert key, key check_ntuple(ret[key]) if LINUX and key[-1].isdigit(): # if 'sda1' is listed 'sda' shouldn't, see: # https://github.com/giampaolo/psutil/issues/338 while key[-1].isdigit(): key = key[:-1] self.assertNotIn(key, ret.keys())
def host_disk_usage_io_performance_report(self): data = list() disk_io_counters = psutil.disk_io_counters(perdisk=True) for mountpoint, disk in self.disks.items(): dev = os.path.basename(disk['real_device']) disk_usage_io = list() if dev in self.last_host_disk_io: disk_usage_io = { 'node_id': self.node_id, 'mountpoint': mountpoint, 'used': psutil.disk_usage(mountpoint).used, 'rd_req': (disk_io_counters[dev].read_count - self.last_host_disk_io[dev].read_count) / self.interval, 'rd_bytes': (disk_io_counters[dev].read_bytes - self.last_host_disk_io[dev].read_bytes) / self.interval, 'wr_req': (disk_io_counters[dev].write_count - self.last_host_disk_io[dev].write_count) / self.interval, 'wr_bytes': (disk_io_counters[dev].write_bytes - self.last_host_disk_io[dev].write_bytes) / self.interval } elif not isinstance(self.last_host_disk_io, dict): self.last_host_disk_io = dict() self.last_host_disk_io[dev] = disk_io_counters[dev] if disk_usage_io.__len__() > 0: data.append(disk_usage_io) if data.__len__() > 0: host_collection_performance_emit.disk_usage_io(data=data)
def update(self): a=0 tot=0 self.load.append(cpu_percent()) for i in range(len(self.load)-1,len(self.load)): a=a+self.load[i] tot=tot+1.0 a=a/tot self.load[len(self.load)-1]=a self.load.pop(0) try: #user reported bug, This is a problem with the underlying function. w_temp=disk_io_counters()[3]/1000 except: w_temp=0 w_delta=w_temp-self.wait_last self.wait_last=w_temp self.wait.append(int(w_delta)) #print(w_delta) self.wait.pop(0) self.color.append([255,0,0]) self.color.pop(0) self.repaint()
def run(self, *unused): if(os.path.isfile("/proc/diskstats")): return diskstats_parse() else: results = {} try: diskdata = psutil.disk_io_counters(perdisk=True) for device, values in diskdata.items(): device_stats = {} for key_value in values._fields: device_stats[key_value] = getattr(values, key_value) results[device] = device_stats except Exception as e: results = e.message return results
def getDiskReadWrite(): diskIO = psutil.disk_io_counters() return( [diskIO.read_time, diskIO.write_time] )
def getMethods(self): methods = [ "disk_usage", "disk_io_counters" ] return ServiceBase.getMethods() + methods
def disk_io_counters(self, perfdisk): return psutil.disk_io_counters(perfdisk)
def check(self): # ??????????????????????????? # if platform_util.is_linux(): # data_per_disk, count = self.get_linux_iostat() # else: # ?????????????iostat??????????????Alren 2016-03-11 data_per_disk, count = self.get_other_iostat() if count: # ???LXC????????????disk_io_counters data = {k: v/count for k,v in data_per_disk.iteritems() if k != 'io.util'} data['io.util'] = data_per_disk['io.util'] else: data = data_per_disk return data
def get_other_iostat(self): curr_stat = psutil.disk_io_counters(True) curr_cpu_time = self.sum_cpu_time(psutil.cpu_times()) / self.cpu_count if self.last_cpu_time == 0: #??? self.last_stat = curr_stat self.last_cpu_time = curr_cpu_time return {}, 0 data_per_disk = {k: 0 for k in self.metric_define} count = 0 ts = curr_cpu_time - self.last_cpu_time for disk, nval in curr_stat.iteritems(): oval = self.last_stat.get(disk)# ????? if not oval: continue total_time = nval.write_time - oval.write_time + nval.read_time - oval.read_time total_count = nval.write_count - oval.write_count + nval.read_count - oval.read_count if not total_count: # ?????IO???????? continue data_per_disk['io.w_s'] += (nval.write_count - oval.write_count) / ts data_per_disk['io.wkbyte_s'] += (nval.write_bytes - oval.write_bytes) / 1024 / ts data_per_disk['io.r_s'] += (nval.read_count - oval.read_count) / ts data_per_disk['io.rkbyte_s'] += (nval.read_bytes - oval.read_bytes) / 1024 / ts data_per_disk['io.await'] += total_time / total_count if total_count else 0.0 if hasattr(oval, 'busy_time'):# linux?psutil==4.0.0??busy_time data_per_disk['io.svctm'] += (nval.busy_time - oval.busy_time) / total_count if total_count else 0.0 io_util = (nval.busy_time - oval.busy_time) * 100.0 / (ts*1000) if io_util > data_per_disk['io.util']:# ????? data_per_disk['io.util'] = io_util if io_util < 100 else 100 data_per_disk['io.queue_time_percent'] = (data_per_disk['io.await'] - data_per_disk['io.svctm']) * 100 / data_per_disk['io.await'] if data_per_disk['io.await'] else 0 count += 1 self.last_stat = curr_stat self.last_cpu_time = curr_cpu_time return data_per_disk, count
def test_disk_io_counters(self): self.execute(psutil.disk_io_counters) # --- proc
def test_disk_io_counters(self): def check_ntuple(nt): self.assertEqual(nt[0], nt.read_count) self.assertEqual(nt[1], nt.write_count) self.assertEqual(nt[2], nt.read_bytes) self.assertEqual(nt[3], nt.write_bytes) if not (OPENBSD or NETBSD): self.assertEqual(nt[4], nt.read_time) self.assertEqual(nt[5], nt.write_time) if LINUX: self.assertEqual(nt[6], nt.read_merged_count) self.assertEqual(nt[7], nt.write_merged_count) self.assertEqual(nt[8], nt.busy_time) elif FREEBSD: self.assertEqual(nt[6], nt.busy_time) for name in nt._fields: assert getattr(nt, name) >= 0, nt ret = psutil.disk_io_counters(perdisk=False) check_ntuple(ret) ret = psutil.disk_io_counters(perdisk=True) # make sure there are no duplicates self.assertEqual(len(ret), len(set(ret))) for key in ret: assert key, key check_ntuple(ret[key]) if LINUX and key[-1].isdigit(): # if 'sda1' is listed 'sda' shouldn't, see: # https://github.com/giampaolo/psutil/issues/338 while key[-1].isdigit(): key = key[:-1] self.assertNotIn(key, ret.keys()) # can't find users on APPVEYOR or TRAVIS
def test_disk_io_counters_kernel_2_6_limited_mocked(self): # Tests /proc/diskstats parsing format for 2.6 kernels, # where one line of /proc/partitions return a limited # amount of metrics when it bumps into a partition # (instead of a disk). See: # https://github.com/giampaolo/psutil/issues/767 def open_mock(name, *args, **kwargs): if name == '/proc/partitions': return io.StringIO(textwrap.dedent(u"""\ major minor #blocks name 8 0 488386584 hda """)) elif name == '/proc/diskstats': return io.StringIO( u(" 3 1 hda 1 2 3 4")) else: return orig_open(name, *args, **kwargs) orig_open = open patch_point = 'builtins.open' if PY3 else '__builtin__.open' with mock.patch(patch_point, side_effect=open_mock) as m: ret = psutil.disk_io_counters() assert m.called self.assertEqual(ret.read_count, 1) self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE) self.assertEqual(ret.write_count, 3) self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE) self.assertEqual(ret.read_merged_count, 0) self.assertEqual(ret.read_time, 0) self.assertEqual(ret.write_merged_count, 0) self.assertEqual(ret.write_time, 0) self.assertEqual(ret.busy_time, 0) # ===================================================================== # --- misc # =====================================================================
def GetSystemState(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8',0)) self.address = s.getsockname()[0] self.cpu_count = psutil.cpu_count() self.cpu_percent = psutil.cpu_percent(interval=1,percpu=True) self.mem = psutil.virtual_memory() self.disk_usage = psutil.disk_usage('/') self.disk_io = psutil.disk_io_counters() self.net_io = psutil.net_io_counters() self.hostname = socket.gethostname()
def f_print_linux_status(save_as): ###????################################################################### #scputimes(user=, nice, system, idle, iowait, irq, softirq,steal, guest, guest_nice) cpu_times = psutil.cpu_times() #scpustats(ctx_switches, interrupts, soft_interrupts, syscalls) #cpu_stats = psutil.cpu_stats() # svmem(total , available, percent, used , free, active, inactive, buffers, cached, shared) mem = psutil.virtual_memory() # sswap(total, used, free, percent, sin, sout) swap = psutil.swap_memory() #sdiskusage(total, used, free, percent) #disk_usage = psutil.disk_usage('/') #sdiskio(read_count, write_count, read_bytes, write_bytes, read_time, write_time) #disk_io_counters = psutil.disk_io_counters() #snetio(bytes_sent, bytes_recv, packets_sent, packets_recv, errin, errout, dropin, dropout) #net = psutil.net_io_counters() #load try: load = os.getloadavg() except (OSError, AttributeError): stats = {} else: stats = {'min1': load[0], 'min5': load[1], 'min15': load[2]} #Uptime = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S") ###????################################################################### style1 = {1: ' ,6,l', 2: ' ,10,r',3: ' ,6,l', 4: ' ,10,r',5: ' ,6,l', 6: ' ,6,r',7: ' ,8,l',8: ' ,6,r',9: ' ,6,l', 10: ' ,6,r',11: ' ,6,l', 12: ' ,5,r',} style = {1: ' ,l', 2: ' ,r',3: ' ,l', 4: ' ,r',5: ' ,l', 6: ' ,r',7: ' ,l',8: ' ,r',9: ' ,l', 10: ' ,r',11: ' ,l', 12: ' ,r',} rows=[ ["CPU", str(psutil.cpu_percent(interval=1))+'%',"nice", cpu_times.nice,"MEM", str(mem.percent) + '%',"active", str(mem.active/1024/1024) + 'M',"SWAP", str(swap.percent)+'%',"LOAD", str(psutil.cpu_count())+'core'], ["user", cpu_times.user,"irq", cpu_times.irq,"total", str(mem.total/1024/1024)+'M',"inactive", str(mem.inactive/1024/1024) + 'M',"total", str(swap.total/1024/1024) + 'M',"1 min", stats["min1"]], ["system", cpu_times.system,"iowait", cpu_times.iowait,"used", str(mem.used/1024/1024)+'M',"buffers", str(mem.buffers/1024/1024) + 'M',"used", str(swap.used/1024/1024) + 'M',"5 min", stats["min5"]], ["idle", cpu_times.idle,"steal", cpu_times.steal,"free", str(mem.free/1024/1024) + 'M',"cached", str(mem.cached/1024/1024) + 'M',"free", str(swap.free/1024/1024) + 'M',"15 min", stats["min15"]] ] title = "Linux Overview" if save_as == "txt": f_print_title(title) f_print_table_body(rows, style1,' ') elif save_as == "html": f_print_table_html(rows, title, style)
def check_disk(i_warning, i_critical, s_partition): """ Checks for disk stats Gets: i_warning: Warning Threshold i_critical: Critical Threshold s_partition: partition that should be used to trigger WARNING/CRITICAL Returns: check output including perfdata """ test_int(i_warning, i_critical) test_string(s_partition) s_perfdata = '' s_output = '' l_partitions = psutil.disk_partitions() d_io_counters = psutil.disk_io_counters(perdisk=True) f_monitored_partition_usage = 0.0 for nt_partition in l_partitions: # get usage for every partition d_disk_usage = psutil.disk_usage(nt_partition.mountpoint)._asdict() # add all usage data to perfdata for key, value in d_disk_usage.items(): s_perfdata = add_perfdata(s_perfdata, nt_partition.mountpoint, key, value) # check monitored partition and add status to output if nt_partition.mountpoint == s_partition: s_output = check_status(i_warning, i_critical, d_disk_usage['percent']) f_monitored_partition_usage = d_disk_usage['percent'] # add message if status is not OK if not 'OK' in s_output: s_output += ' {} has a usage of {} percent.'.format(s_partition, f_monitored_partition_usage) # add all the mountpoints and other info to output for nt_partition in l_partitions: d_partition = nt_partition._asdict() for key, value in d_partition.items(): if not key == 'device': s_output += '\n{}.{}={}'.format( d_partition['device'], key, value ) for s_device, nt_partition in d_io_counters.items(): d_partition = nt_partition._asdict() # add all io_counters to perfdata for key, value in d_partition.items(): s_perfdata = add_perfdata(s_perfdata, s_device, key, value) # put it all together s_output += ' | {}'.format(s_perfdata) return s_output
def poll(interval): """Calculate IO usage by comparing IO statics before and after the interval. Return a tuple including all currently running processes sorted by IO activity and total disks I/O activity. """ # first get a list of all processes and disk io counters procs = [p for p in psutil.process_iter()] for p in procs[:]: try: p._before = p.io_counters() except psutil.Error: procs.remove(p) continue disks_before = psutil.disk_io_counters() # sleep some time time.sleep(interval) # then retrieve the same info again for p in procs[:]: try: p._after = p.io_counters() p._cmdline = ' '.join(p.cmdline()) if not p._cmdline: p._cmdline = p.name() p._username = p.username() except (psutil.NoSuchProcess, psutil.ZombieProcess): procs.remove(p) disks_after = psutil.disk_io_counters() # finally calculate results by comparing data before and # after the interval for p in procs: p._read_per_sec = p._after.read_bytes - p._before.read_bytes p._write_per_sec = p._after.write_bytes - p._before.write_bytes p._total = p._read_per_sec + p._write_per_sec disks_read_per_sec = disks_after.read_bytes - disks_before.read_bytes disks_write_per_sec = disks_after.write_bytes - disks_before.write_bytes # sort processes by total disk IO so that the more intensive # ones get listed first processes = sorted(procs, key=lambda p: p._total, reverse=True) return (processes, disks_read_per_sec, disks_write_per_sec)