Python psutil 模块,disk_io_counters() 实例源码

我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用psutil.disk_io_counters()

项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:skynet    作者:skynetera    | 项目源码 | 文件源码
def monitor(frist_invoke=2):
    """
    Return (inbytes, outbytes, in_num, out_num, ioms) of disk.
    """
    sdiskio = psutil.disk_io_counters()
    # sleep some time

    value_dic = {
        'iostats': {
            'io.disks_read': sdiskio.read_bytes/(1024*1024),
            'io.disks_write': sdiskio.write_bytes/(1024*1024),
            'io.disks_read_count': sdiskio.read_count/(1024 * 1024),
            'io.disks_write_count': sdiskio.write_count/(1024 * 1024),
            'io.disks_read_time': sdiskio.read_time/1000,
            'io.disks_write_time': sdiskio.write_time/1000,
            'io.disks_busy_time': sdiskio.write_time/1000,
        }
    }

    return value_dic
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:pybix    作者:lioncui    | 项目源码 | 文件源码
def get_disk_io_info(self):
        returnData = {'readiokps': {}, 'writeiokps': {}}
        try:
            old_info = psutil.disk_io_counters(perdisk=True)
            time.sleep(1)
            new_info = psutil.disk_io_counters(perdisk=True)
            for (diskname, rwinfo) in old_info.items():
                oldr, oldw = rwinfo.read_bytes, rwinfo.write_bytes
                newr, neww = new_info[diskname].read_bytes, new_info[
                    diskname].write_bytes
                riok = (newr - oldr) / 1024.0
                wiok = (neww - oldw) / 1024.0
                returnData['readiokps'][diskname] = riok
                returnData['writeiokps'][diskname] = wiok
        except Exception:
            pybixlib.error(self.logHead + traceback.format_exc())
            self.errorInfoDone(traceback.format_exc())
        return returnData
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def get_metrics(self):
        self.initial_io_stats = psutil.disk_io_counters(perdisk=True)
        curr_host_name = socket.gethostbyname(
            self.CONFIG['peer_name']
        )
        time.sleep(self.STAT_INTERVAL_FOR_PER_SEC_COUNTER)
        self.current_io_stats = psutil.disk_io_counters(perdisk=True)
        threads = []
        for volume in self.CLUSTER_TOPOLOGY.get('volumes', []):
            for sub_volume_index, sub_volume_bricks in volume.get(
                'bricks',
                []
            ).iteritems():
                for brick in sub_volume_bricks:
                    brick_hostname = brick['hostname']
                    if (
                        brick_hostname == curr_host_name or
                        brick_hostname == self.CONFIG['peer_name']
                    ):
                        thread = threading.Thread(
                            target=self.populate_disk_details,
                            args=(
                                volume['name'],
                                brick['hostname'],
                                brick['path'],
                            )
                        )
                        thread.start()
                        threads.append(
                            thread
                        )
        for thread in threads:
            thread.join(1)
        for thread in threads:
            del thread
        return self.brick_details
项目:simon    作者:half0wl    | 项目源码 | 文件源码
def disk_read():
    return bytes2human(psutil.disk_io_counters().read_bytes)
项目:simon    作者:half0wl    | 项目源码 | 文件源码
def disk_written():
    return bytes2human(psutil.disk_io_counters().write_bytes)
项目:sawtooth-validator    作者:hyperledger-archives    | 项目源码 | 文件源码
def get_stats(self):
        cpct = psutil.cpu_percent(interval=0)
        ctimes = psutil.cpu_times_percent()
        self.cpu_stats = CpuStats(cpct, ctimes.user, ctimes.system,
                                  ctimes.idle)

        self.vmem_stats = psutil.virtual_memory()
        self.disk_stats = psutil.disk_io_counters()
        self.net_stats = psutil.net_io_counters()

        # must create new stats list each time stats are updated
        # because named tuples are immutable
        self.statslist = [self.cpu_stats, self.vmem_stats, self.disk_stats,
                          self.net_stats]
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_io_counters(self):
        self.execute(psutil.disk_io_counters)

    # --- net
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_4_mocked(self):
        # Tests /proc/diskstats parsing format for 2.4 kernels, see:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3     0   1 hda 2 3 4 5 6 7 8 9 10 11 12"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_merged_count, 2)
            self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
            self.assertEqual(ret.read_time, 4)
            self.assertEqual(ret.write_count, 5)
            self.assertEqual(ret.write_merged_count, 6)
            self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
            self.assertEqual(ret.write_time, 8)
            self.assertEqual(ret.busy_time, 10)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_6_full_mocked(self):
        # Tests /proc/diskstats parsing format for 2.6 kernels,
        # lines reporting all metrics:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3    0   hda 1 2 3 4 5 6 7 8 9 10 11"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_merged_count, 2)
            self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
            self.assertEqual(ret.read_time, 4)
            self.assertEqual(ret.write_count, 5)
            self.assertEqual(ret.write_merged_count, 6)
            self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
            self.assertEqual(ret.write_time, 8)
            self.assertEqual(ret.busy_time, 10)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_6_limited_mocked(self):
        # Tests /proc/diskstats parsing format for 2.6 kernels,
        # where one line of /proc/partitions return a limited
        # amount of metrics when it bumps into a partition
        # (instead of a disk). See:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3    1   hda 1 2 3 4"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
            self.assertEqual(ret.write_count, 3)
            self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)

            self.assertEqual(ret.read_merged_count, 0)
            self.assertEqual(ret.read_time, 0)
            self.assertEqual(ret.write_merged_count, 0)
            self.assertEqual(ret.write_time, 0)
            self.assertEqual(ret.busy_time, 0)


# =====================================================================
# misc
# =====================================================================
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_io_counters(self):
        def check_ntuple(nt):
            self.assertEqual(nt[0], nt.read_count)
            self.assertEqual(nt[1], nt.write_count)
            self.assertEqual(nt[2], nt.read_bytes)
            self.assertEqual(nt[3], nt.write_bytes)
            if not (OPENBSD or NETBSD):
                self.assertEqual(nt[4], nt.read_time)
                self.assertEqual(nt[5], nt.write_time)
                if LINUX:
                    self.assertEqual(nt[6], nt.read_merged_count)
                    self.assertEqual(nt[7], nt.write_merged_count)
                    self.assertEqual(nt[8], nt.busy_time)
                elif FREEBSD:
                    self.assertEqual(nt[6], nt.busy_time)
            for name in nt._fields:
                assert getattr(nt, name) >= 0, nt

        ret = psutil.disk_io_counters(perdisk=False)
        check_ntuple(ret)
        ret = psutil.disk_io_counters(perdisk=True)
        # make sure there are no duplicates
        self.assertEqual(len(ret), len(set(ret)))
        for key in ret:
            assert key, key
            check_ntuple(ret[key])
            if LINUX and key[-1].isdigit():
                # if 'sda1' is listed 'sda' shouldn't, see:
                # https://github.com/giampaolo/psutil/issues/338
                while key[-1].isdigit():
                    key = key[:-1]
                self.assertNotIn(key, ret.keys())
项目:JimV-N    作者:jamesiter    | 项目源码 | 文件源码
def host_disk_usage_io_performance_report(self):

        data = list()
        disk_io_counters = psutil.disk_io_counters(perdisk=True)

        for mountpoint, disk in self.disks.items():
            dev = os.path.basename(disk['real_device'])
            disk_usage_io = list()
            if dev in self.last_host_disk_io:
                disk_usage_io = {
                    'node_id': self.node_id,
                    'mountpoint': mountpoint,
                    'used': psutil.disk_usage(mountpoint).used,
                    'rd_req':
                        (disk_io_counters[dev].read_count - self.last_host_disk_io[dev].read_count) / self.interval,
                    'rd_bytes':
                        (disk_io_counters[dev].read_bytes - self.last_host_disk_io[dev].read_bytes) / self.interval,
                    'wr_req':
                        (disk_io_counters[dev].write_count - self.last_host_disk_io[dev].write_count) / self.interval,
                    'wr_bytes':
                        (disk_io_counters[dev].write_bytes - self.last_host_disk_io[dev].write_bytes) / self.interval
                }

            elif not isinstance(self.last_host_disk_io, dict):
                self.last_host_disk_io = dict()

            self.last_host_disk_io[dev] = disk_io_counters[dev]

            if disk_usage_io.__len__() > 0:
                data.append(disk_usage_io)

        if data.__len__() > 0:
            host_collection_performance_emit.disk_usage_io(data=data)
项目:gpvdm    作者:roderickmackenzie    | 项目源码 | 文件源码
def update(self):
        a=0
        tot=0
        self.load.append(cpu_percent())
        for i in range(len(self.load)-1,len(self.load)):
            a=a+self.load[i]
            tot=tot+1.0
        a=a/tot
        self.load[len(self.load)-1]=a
        self.load.pop(0)

        try:        #user reported bug, This is a problem with the underlying function.
            w_temp=disk_io_counters()[3]/1000
        except:
            w_temp=0

        w_delta=w_temp-self.wait_last
        self.wait_last=w_temp

        self.wait.append(int(w_delta))
        #print(w_delta)
        self.wait.pop(0)

        self.color.append([255,0,0])
        self.color.pop(0)


        self.repaint()
项目:nixstatsagent    作者:NIXStats    | 项目源码 | 文件源码
def run(self, *unused):
        if(os.path.isfile("/proc/diskstats")):
            return diskstats_parse()
        else:
            results = {}
            try:
                diskdata = psutil.disk_io_counters(perdisk=True)
                for device, values in diskdata.items():
                    device_stats = {}
                    for key_value in values._fields:
                        device_stats[key_value] = getattr(values, key_value)
                    results[device] = device_stats
            except Exception as e:
                results = e.message
            return results
项目:zorro    作者:C-CINA    | 项目源码 | 文件源码
def getDiskReadWrite():
    diskIO = psutil.disk_io_counters()
    return( [diskIO.read_time, diskIO.write_time] )
项目:zorro    作者:C-CINA    | 项目源码 | 文件源码
def getDiskReadWrite():
    diskIO = psutil.disk_io_counters()
    return( [diskIO.read_time, diskIO.write_time] )
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def getMethods(self):
        methods = [
            "disk_usage",
            "disk_io_counters"
            ]
        return ServiceBase.getMethods() + methods
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def disk_io_counters(self, perfdisk):
        return psutil.disk_io_counters(perfdisk)
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def check(self):
        # ???????????????????????????
        # if platform_util.is_linux():
        #     data_per_disk, count = self.get_linux_iostat()
        # else:
        # ?????????????iostat??????????????Alren 2016-03-11
        data_per_disk, count = self.get_other_iostat()
        if count: # ???LXC????????????disk_io_counters
            data = {k: v/count for k,v in data_per_disk.iteritems() if k != 'io.util'}
            data['io.util'] = data_per_disk['io.util']
        else:
            data = data_per_disk
        return data
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def get_other_iostat(self):
        curr_stat = psutil.disk_io_counters(True)
        curr_cpu_time = self.sum_cpu_time(psutil.cpu_times()) / self.cpu_count
        if self.last_cpu_time == 0: #???
            self.last_stat = curr_stat
            self.last_cpu_time = curr_cpu_time
            return {}, 0
        data_per_disk = {k: 0 for k in self.metric_define}
        count = 0
        ts = curr_cpu_time - self.last_cpu_time
        for disk, nval in curr_stat.iteritems():
            oval = self.last_stat.get(disk)# ?????
            if not oval:
                continue
            total_time = nval.write_time - oval.write_time + nval.read_time - oval.read_time
            total_count = nval.write_count - oval.write_count + nval.read_count - oval.read_count
            if not total_count: # ?????IO????????
                continue
            data_per_disk['io.w_s'] += (nval.write_count - oval.write_count) / ts
            data_per_disk['io.wkbyte_s'] += (nval.write_bytes - oval.write_bytes) / 1024 / ts
            data_per_disk['io.r_s'] += (nval.read_count - oval.read_count) / ts
            data_per_disk['io.rkbyte_s'] += (nval.read_bytes - oval.read_bytes) / 1024 / ts
            data_per_disk['io.await'] += total_time / total_count if total_count else 0.0
            if hasattr(oval, 'busy_time'):# linux?psutil==4.0.0??busy_time
                data_per_disk['io.svctm'] += (nval.busy_time - oval.busy_time) / total_count if total_count else 0.0
                io_util = (nval.busy_time - oval.busy_time) * 100.0 / (ts*1000)
                if io_util > data_per_disk['io.util']:# ?????
                    data_per_disk['io.util'] = io_util if io_util < 100 else 100
                data_per_disk['io.queue_time_percent'] = (data_per_disk['io.await'] - data_per_disk['io.svctm']) * 100 / data_per_disk['io.await'] if data_per_disk['io.await'] else 0
            count += 1

        self.last_stat = curr_stat
        self.last_cpu_time = curr_cpu_time
        return data_per_disk, count
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_io_counters(self):
        self.execute(psutil.disk_io_counters)

    # --- proc
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_4_mocked(self):
        # Tests /proc/diskstats parsing format for 2.4 kernels, see:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3     0   1 hda 2 3 4 5 6 7 8 9 10 11 12"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_merged_count, 2)
            self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
            self.assertEqual(ret.read_time, 4)
            self.assertEqual(ret.write_count, 5)
            self.assertEqual(ret.write_merged_count, 6)
            self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
            self.assertEqual(ret.write_time, 8)
            self.assertEqual(ret.busy_time, 10)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_6_full_mocked(self):
        # Tests /proc/diskstats parsing format for 2.6 kernels,
        # lines reporting all metrics:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3    0   hda 1 2 3 4 5 6 7 8 9 10 11"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_merged_count, 2)
            self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
            self.assertEqual(ret.read_time, 4)
            self.assertEqual(ret.write_count, 5)
            self.assertEqual(ret.write_merged_count, 6)
            self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
            self.assertEqual(ret.write_time, 8)
            self.assertEqual(ret.busy_time, 10)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_6_limited_mocked(self):
        # Tests /proc/diskstats parsing format for 2.6 kernels,
        # where one line of /proc/partitions return a limited
        # amount of metrics when it bumps into a partition
        # (instead of a disk). See:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3    1   hda 1 2 3 4"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
            self.assertEqual(ret.write_count, 3)
            self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)

            self.assertEqual(ret.read_merged_count, 0)
            self.assertEqual(ret.read_time, 0)
            self.assertEqual(ret.write_merged_count, 0)
            self.assertEqual(ret.write_time, 0)
            self.assertEqual(ret.busy_time, 0)


# =====================================================================
# misc
# =====================================================================
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_io_counters(self):
        def check_ntuple(nt):
            self.assertEqual(nt[0], nt.read_count)
            self.assertEqual(nt[1], nt.write_count)
            self.assertEqual(nt[2], nt.read_bytes)
            self.assertEqual(nt[3], nt.write_bytes)
            if not (OPENBSD or NETBSD):
                self.assertEqual(nt[4], nt.read_time)
                self.assertEqual(nt[5], nt.write_time)
                if LINUX:
                    self.assertEqual(nt[6], nt.read_merged_count)
                    self.assertEqual(nt[7], nt.write_merged_count)
                    self.assertEqual(nt[8], nt.busy_time)
                elif FREEBSD:
                    self.assertEqual(nt[6], nt.busy_time)
            for name in nt._fields:
                assert getattr(nt, name) >= 0, nt

        ret = psutil.disk_io_counters(perdisk=False)
        check_ntuple(ret)
        ret = psutil.disk_io_counters(perdisk=True)
        # make sure there are no duplicates
        self.assertEqual(len(ret), len(set(ret)))
        for key in ret:
            assert key, key
            check_ntuple(ret[key])
            if LINUX and key[-1].isdigit():
                # if 'sda1' is listed 'sda' shouldn't, see:
                # https://github.com/giampaolo/psutil/issues/338
                while key[-1].isdigit():
                    key = key[:-1]
                self.assertNotIn(key, ret.keys())

    # can't find users on APPVEYOR or TRAVIS
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_io_counters(self):
        self.execute(psutil.disk_io_counters)

    # --- proc
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_4_mocked(self):
        # Tests /proc/diskstats parsing format for 2.4 kernels, see:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3     0   1 hda 2 3 4 5 6 7 8 9 10 11 12"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_merged_count, 2)
            self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
            self.assertEqual(ret.read_time, 4)
            self.assertEqual(ret.write_count, 5)
            self.assertEqual(ret.write_merged_count, 6)
            self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
            self.assertEqual(ret.write_time, 8)
            self.assertEqual(ret.busy_time, 10)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_6_full_mocked(self):
        # Tests /proc/diskstats parsing format for 2.6 kernels,
        # lines reporting all metrics:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3    0   hda 1 2 3 4 5 6 7 8 9 10 11"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_merged_count, 2)
            self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
            self.assertEqual(ret.read_time, 4)
            self.assertEqual(ret.write_count, 5)
            self.assertEqual(ret.write_merged_count, 6)
            self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
            self.assertEqual(ret.write_time, 8)
            self.assertEqual(ret.busy_time, 10)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_io_counters_kernel_2_6_limited_mocked(self):
        # Tests /proc/diskstats parsing format for 2.6 kernels,
        # where one line of /proc/partitions return a limited
        # amount of metrics when it bumps into a partition
        # (instead of a disk). See:
        # https://github.com/giampaolo/psutil/issues/767
        def open_mock(name, *args, **kwargs):
            if name == '/proc/partitions':
                return io.StringIO(textwrap.dedent(u"""\
                    major minor  #blocks  name

                       8        0  488386584 hda
                    """))
            elif name == '/proc/diskstats':
                return io.StringIO(
                    u("   3    1   hda 1 2 3 4"))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.disk_io_counters()
            assert m.called
            self.assertEqual(ret.read_count, 1)
            self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
            self.assertEqual(ret.write_count, 3)
            self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)

            self.assertEqual(ret.read_merged_count, 0)
            self.assertEqual(ret.read_time, 0)
            self.assertEqual(ret.write_merged_count, 0)
            self.assertEqual(ret.write_time, 0)
            self.assertEqual(ret.busy_time, 0)


# =====================================================================
# --- misc
# =====================================================================
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_io_counters(self):
        def check_ntuple(nt):
            self.assertEqual(nt[0], nt.read_count)
            self.assertEqual(nt[1], nt.write_count)
            self.assertEqual(nt[2], nt.read_bytes)
            self.assertEqual(nt[3], nt.write_bytes)
            if not (OPENBSD or NETBSD):
                self.assertEqual(nt[4], nt.read_time)
                self.assertEqual(nt[5], nt.write_time)
                if LINUX:
                    self.assertEqual(nt[6], nt.read_merged_count)
                    self.assertEqual(nt[7], nt.write_merged_count)
                    self.assertEqual(nt[8], nt.busy_time)
                elif FREEBSD:
                    self.assertEqual(nt[6], nt.busy_time)
            for name in nt._fields:
                assert getattr(nt, name) >= 0, nt

        ret = psutil.disk_io_counters(perdisk=False)
        check_ntuple(ret)
        ret = psutil.disk_io_counters(perdisk=True)
        # make sure there are no duplicates
        self.assertEqual(len(ret), len(set(ret)))
        for key in ret:
            assert key, key
            check_ntuple(ret[key])
            if LINUX and key[-1].isdigit():
                # if 'sda1' is listed 'sda' shouldn't, see:
                # https://github.com/giampaolo/psutil/issues/338
                while key[-1].isdigit():
                    key = key[:-1]
                self.assertNotIn(key, ret.keys())

    # can't find users on APPVEYOR or TRAVIS
项目:rune    作者:hoonkim    | 项目源码 | 文件源码
def GetSystemState(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8',0))
        self.address = s.getsockname()[0]
        self.cpu_count = psutil.cpu_count()
        self.cpu_percent = psutil.cpu_percent(interval=1,percpu=True)
        self.mem = psutil.virtual_memory()
        self.disk_usage = psutil.disk_usage('/')
        self.disk_io = psutil.disk_io_counters()
        self.net_io = psutil.net_io_counters()
        self.hostname = socket.gethostname()
项目:rune    作者:hoonkim    | 项目源码 | 文件源码
def GetSystemState(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8',0))
        self.address = s.getsockname()[0]
        self.cpu_count = psutil.cpu_count()
        self.cpu_percent = psutil.cpu_percent(interval=1,percpu=True)
        self.mem = psutil.virtual_memory()
        self.disk_usage = psutil.disk_usage('/')
        self.disk_io = psutil.disk_io_counters()
        self.net_io = psutil.net_io_counters()
        self.hostname = socket.gethostname()
项目:MySQL_Watcher    作者:kinghows    | 项目源码 | 文件源码
def f_print_linux_status(save_as):
    ###????###################################################################
    #scputimes(user=, nice, system, idle, iowait, irq, softirq,steal, guest, guest_nice)
    cpu_times = psutil.cpu_times()
    #scpustats(ctx_switches, interrupts, soft_interrupts, syscalls)
    #cpu_stats = psutil.cpu_stats()
    # svmem(total , available, percent, used , free, active, inactive, buffers, cached, shared)
    mem = psutil.virtual_memory()
    # sswap(total, used, free, percent, sin, sout)
    swap = psutil.swap_memory()
    #sdiskusage(total, used, free, percent)
    #disk_usage = psutil.disk_usage('/')
    #sdiskio(read_count, write_count, read_bytes, write_bytes, read_time, write_time)
    #disk_io_counters = psutil.disk_io_counters()
    #snetio(bytes_sent, bytes_recv, packets_sent, packets_recv, errin, errout, dropin, dropout)
    #net = psutil.net_io_counters()
    #load
    try:
        load = os.getloadavg()
    except (OSError, AttributeError):
        stats = {}
    else:
        stats = {'min1': load[0], 'min5': load[1], 'min15': load[2]}

    #Uptime = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
    ###????###################################################################
    style1 = {1: '&nbsp;,6,l', 2: '&nbsp;,10,r',3: '&nbsp;,6,l', 4: '&nbsp;,10,r',5: '&nbsp;,6,l', 6: '&nbsp;,6,r',7: '&nbsp;,8,l',8: '&nbsp;,6,r',9: '&nbsp;,6,l', 10: '&nbsp;,6,r',11: '&nbsp;,6,l', 12: '&nbsp;,5,r',}
    style = {1: '&nbsp;,l', 2: '&nbsp;,r',3: '&nbsp;,l', 4: '&nbsp;,r',5: '&nbsp;,l', 6: '&nbsp;,r',7: '&nbsp;,l',8: '&nbsp;,r',9: '&nbsp;,l', 10: '&nbsp;,r',11: '&nbsp;,l', 12: '&nbsp;,r',}
    rows=[
          ["CPU", str(psutil.cpu_percent(interval=1))+'%',"nice", cpu_times.nice,"MEM", str(mem.percent) + '%',"active", str(mem.active/1024/1024) + 'M',"SWAP", str(swap.percent)+'%',"LOAD", str(psutil.cpu_count())+'core'],
          ["user", cpu_times.user,"irq", cpu_times.irq,"total", str(mem.total/1024/1024)+'M',"inactive", str(mem.inactive/1024/1024) + 'M',"total", str(swap.total/1024/1024) + 'M',"1 min", stats["min1"]],
          ["system", cpu_times.system,"iowait", cpu_times.iowait,"used", str(mem.used/1024/1024)+'M',"buffers", str(mem.buffers/1024/1024) + 'M',"used", str(swap.used/1024/1024) + 'M',"5 min", stats["min5"]],
          ["idle", cpu_times.idle,"steal", cpu_times.steal,"free", str(mem.free/1024/1024) + 'M',"cached", str(mem.cached/1024/1024) + 'M',"free", str(swap.free/1024/1024) + 'M',"15 min", stats["min15"]]
         ]

    title = "Linux Overview"
    if save_as == "txt":
        f_print_title(title)
        f_print_table_body(rows, style1,' ')
    elif save_as == "html":
        f_print_table_html(rows, title, style)
项目:icinga2checks    作者:c-store    | 项目源码 | 文件源码
def check_disk(i_warning, i_critical, s_partition):
    """
    Checks for disk stats

    Gets:
        i_warning: Warning Threshold
        i_critical: Critical Threshold
        s_partition: partition that should be used to trigger WARNING/CRITICAL

    Returns:
        check output including perfdata
    """
    test_int(i_warning, i_critical)
    test_string(s_partition)

    s_perfdata                  = ''
    s_output                    = ''
    l_partitions                = psutil.disk_partitions()
    d_io_counters               = psutil.disk_io_counters(perdisk=True)
    f_monitored_partition_usage = 0.0

    for nt_partition in l_partitions:
        # get usage for every partition
        d_disk_usage = psutil.disk_usage(nt_partition.mountpoint)._asdict()
        # add all usage data to perfdata
        for key, value in d_disk_usage.items():
            s_perfdata = add_perfdata(s_perfdata, nt_partition.mountpoint, key, value)

        # check monitored partition and add status to output
        if nt_partition.mountpoint == s_partition:
            s_output = check_status(i_warning, i_critical, d_disk_usage['percent'])
            f_monitored_partition_usage = d_disk_usage['percent']

    # add message if status is not OK
    if not 'OK' in s_output:
        s_output += ' {} has a usage of {} percent.'.format(s_partition, f_monitored_partition_usage)

    # add all the mountpoints and other info to output
    for nt_partition in l_partitions:
        d_partition = nt_partition._asdict()
        for key, value in d_partition.items():
            if not key == 'device':
                s_output += '\n{}.{}={}'.format(
                    d_partition['device'],
                    key,
                    value
                )

    for s_device, nt_partition in d_io_counters.items():
        d_partition = nt_partition._asdict()
        # add all io_counters to perfdata
        for key, value in d_partition.items():
            s_perfdata = add_perfdata(s_perfdata, s_device, key, value)

    # put it all together
    s_output += ' | {}'.format(s_perfdata)

    return s_output
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def poll(interval):
    """Calculate IO usage by comparing IO statics before and
    after the interval.
    Return a tuple including all currently running processes
    sorted by IO activity and total disks I/O activity.
    """
    # first get a list of all processes and disk io counters
    procs = [p for p in psutil.process_iter()]
    for p in procs[:]:
        try:
            p._before = p.io_counters()
        except psutil.Error:
            procs.remove(p)
            continue
    disks_before = psutil.disk_io_counters()

    # sleep some time
    time.sleep(interval)

    # then retrieve the same info again
    for p in procs[:]:
        try:
            p._after = p.io_counters()
            p._cmdline = ' '.join(p.cmdline())
            if not p._cmdline:
                p._cmdline = p.name()
            p._username = p.username()
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
            procs.remove(p)
    disks_after = psutil.disk_io_counters()

    # finally calculate results by comparing data before and
    # after the interval
    for p in procs:
        p._read_per_sec = p._after.read_bytes - p._before.read_bytes
        p._write_per_sec = p._after.write_bytes - p._before.write_bytes
        p._total = p._read_per_sec + p._write_per_sec

    disks_read_per_sec = disks_after.read_bytes - disks_before.read_bytes
    disks_write_per_sec = disks_after.write_bytes - disks_before.write_bytes

    # sort processes by total disk IO so that the more intensive
    # ones get listed first
    processes = sorted(procs, key=lambda p: p._total, reverse=True)

    return (processes, disks_read_per_sec, disks_write_per_sec)