Python psutil 模块,cpu_times() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.cpu_times()

项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_times(self):
        fields = psutil.cpu_times()._fields
        kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0]
        kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
        if kernel_ver_info >= (2, 6, 11):
            self.assertIn('steal', fields)
        else:
            self.assertNotIn('steal', fields)
        if kernel_ver_info >= (2, 6, 24):
            self.assertIn('guest', fields)
        else:
            self.assertNotIn('guest', fields)
        if kernel_ver_info >= (3, 2, 0):
            self.assertIn('guest_nice', fields)
        else:
            self.assertNotIn('guest_nice', fields)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_oneshot_twice(self):
        # Test the case where the ctx manager is __enter__ed twice.
        # The second __enter__ is supposed to resut in a NOOP.
        with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
            with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
                p = psutil.Process()
                with p.oneshot():
                    p.cpu_times()
                    p.cpu_times()
                    with p.oneshot():
                        p.cpu_times()
                        p.cpu_times()
                self.assertEqual(m1.call_count, 1)
                self.assertEqual(m2.call_count, 1)

        with mock.patch("psutil._psplatform.Process.cpu_times") as m:
            p.cpu_times()
            p.cpu_times()
        self.assertEqual(m.call_count, 2)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_Popen(self):
        # XXX this test causes a ResourceWarning on Python 3 because
        # psutil.__subproc instance doesn't get propertly freed.
        # Not sure what to do though.
        cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
        proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
        try:
            proc.name()
            proc.cpu_times()
            proc.stdin
            self.assertTrue(dir(proc))
            self.assertRaises(AttributeError, getattr, proc, 'foo')
        finally:
            proc.kill()
            proc.wait()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cpu_times(self):
        fields = psutil.cpu_times()._fields
        kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0]
        kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
        if kernel_ver_info >= (2, 6, 11):
            self.assertIn('steal', fields)
        else:
            self.assertNotIn('steal', fields)
        if kernel_ver_info >= (2, 6, 24):
            self.assertIn('guest', fields)
        else:
            self.assertNotIn('guest', fields)
        if kernel_ver_info >= (3, 2, 0):
            self.assertIn('guest_nice', fields)
        else:
            self.assertNotIn('guest_nice', fields)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_oneshot_twice(self):
        # Test the case where the ctx manager is __enter__ed twice.
        # The second __enter__ is supposed to resut in a NOOP.
        with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
            with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
                p = psutil.Process()
                with p.oneshot():
                    p.cpu_times()
                    p.cpu_times()
                    with p.oneshot():
                        p.cpu_times()
                        p.cpu_times()
                self.assertEqual(m1.call_count, 1)
                self.assertEqual(m2.call_count, 1)

        with mock.patch("psutil._psplatform.Process.cpu_times") as m:
            p.cpu_times()
            p.cpu_times()
        self.assertEqual(m.call_count, 2)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_Popen(self):
        # XXX this test causes a ResourceWarning on Python 3 because
        # psutil.__subproc instance doesn't get propertly freed.
        # Not sure what to do though.
        cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
        proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
        try:
            proc.name()
            proc.cpu_times()
            proc.stdin
            self.assertTrue(dir(proc))
            self.assertRaises(AttributeError, getattr, proc, 'foo')
        finally:
            proc.kill()
            proc.wait()
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_cpu_times(self):
        fields = psutil.cpu_times()._fields
        kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0]
        kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
        if kernel_ver_info >= (2, 6, 11):
            self.assertIn('steal', fields)
        else:
            self.assertNotIn('steal', fields)
        if kernel_ver_info >= (2, 6, 24):
            self.assertIn('guest', fields)
        else:
            self.assertNotIn('guest', fields)
        if kernel_ver_info >= (3, 2, 0):
            self.assertIn('guest_nice', fields)
        else:
            self.assertNotIn('guest_nice', fields)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_oneshot_twice(self):
        # Test the case where the ctx manager is __enter__ed twice.
        # The second __enter__ is supposed to resut in a NOOP.
        with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
            with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
                p = psutil.Process()
                with p.oneshot():
                    p.cpu_times()
                    p.cpu_times()
                    with p.oneshot():
                        p.cpu_times()
                        p.cpu_times()
                self.assertEqual(m1.call_count, 1)
                self.assertEqual(m2.call_count, 1)

        with mock.patch("psutil._psplatform.Process.cpu_times") as m:
            p.cpu_times()
            p.cpu_times()
        self.assertEqual(m.call_count, 2)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_Popen(self):
        # XXX this test causes a ResourceWarning on Python 3 because
        # psutil.__subproc instance doesn't get propertly freed.
        # Not sure what to do though.
        cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
        proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
        try:
            proc.name()
            proc.cpu_times()
            proc.stdin
            self.assertTrue(dir(proc))
            self.assertRaises(AttributeError, getattr, proc, 'foo')
        finally:
            proc.kill()
            proc.wait()
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def getLoadAverage():
    if linux:
        import multiprocessing
        k = 1.0
        k /= multiprocessing.cpu_count()
        if os.path.exists('/proc/loadavg'):
            return [float(open('/proc/loadavg').read().split()[x]) * k for x in range(3)]
        else:
            tokens = subprocess.check_output(['uptime']).split()
            return [float(x.strip(',')) * k for x in tokens[-3:]]
    if mswindows:
        # TODO(Guodong Ding) get this field data like on Linux for Windows
        # print psutil.cpu_percent()
        # print psutil.cpu_times_percent()
        # print psutil.cpu_times()
        # print psutil.cpu_stats()
        return "%.2f%%" % psutil.cpu_percent()
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_times(self):
        self.execute(self.proc.cpu_times)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_times(self):
        self.execute(psutil.cpu_times)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_per_cpu_times(self):
        self.execute(psutil.cpu_times, percpu=True)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_times(self):
        times = psutil.Process().cpu_times()
        assert (times.user > 0.0) or (times.system > 0.0), times
        assert (times.children_user >= 0.0), times
        assert (times.children_system >= 0.0), times
        # make sure returned values can be pretty printed with strftime
        for name in times._fields:
            time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))

    # Test Process.cpu_times() against os.times()
    # os.times() is broken on Python 2.6
    # http://bugs.python.org/issue1040026
    # XXX fails on OSX: not sure if it's for os.times(). We should
    # try this with Python 2.7 and re-enable the test.
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_times_2(self):
        user_time, kernel_time = psutil.Process().cpu_times()[:2]
        utime, ktime = os.times()[:2]

        # Use os.times()[:2] as base values to compare our results
        # using a tolerance  of +/- 0.1 seconds.
        # It will fail if the difference between the values is > 0.1s.
        if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
            self.fail("expected: %s, found: %s" % (utime, user_time))

        if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
            self.fail("expected: %s, found: %s" % (ktime, kernel_time))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_threads_2(self):
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        if OPENBSD:
            try:
                p.threads()
            except psutil.AccessDenied:
                raise unittest.SkipTest(
                    "on OpenBSD this requires root access")
        self.assertAlmostEqual(
            p.cpu_times().user,
            sum([x.user_time for x in p.threads()]), delta=0.1)
        self.assertAlmostEqual(
            p.cpu_times().system,
            sum([x.system_time for x in p.threads()]), delta=0.1)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_affinity(self):
        p = psutil.Process()
        initial = p.cpu_affinity()
        if hasattr(os, "sched_getaffinity"):
            self.assertEqual(initial, list(os.sched_getaffinity(p.pid)))
        self.assertEqual(len(initial), len(set(initial)))
        all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
        # setting on travis doesn't seem to work (always return all
        # CPUs on get):
        # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, ... != [0]
        for n in all_cpus:
            p.cpu_affinity([n])
            self.assertEqual(p.cpu_affinity(), [n])
            if hasattr(os, "sched_getaffinity"):
                self.assertEqual(p.cpu_affinity(),
                                 list(os.sched_getaffinity(p.pid)))
        #
        p.cpu_affinity(all_cpus)
        self.assertEqual(p.cpu_affinity(), all_cpus)
        if hasattr(os, "sched_getaffinity"):
            self.assertEqual(p.cpu_affinity(),
                             list(os.sched_getaffinity(p.pid)))
        #
        self.assertRaises(TypeError, p.cpu_affinity, 1)
        p.cpu_affinity(initial)
        # it should work with all iterables, not only lists
        p.cpu_affinity(set(all_cpus))
        p.cpu_affinity(tuple(all_cpus))
        invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
        self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
        self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000))
        self.assertRaises(TypeError, p.cpu_affinity, [0, "1"])
        self.assertRaises(ValueError, p.cpu_affinity, [0, -1])

    # TODO: #595
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def cpu_times(self, ret, proc):
        self.assertTrue(ret.user >= 0)
        self.assertTrue(ret.system >= 0)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_count(self):
        logical = psutil.cpu_count()
        self.assertEqual(logical, len(psutil.cpu_times(percpu=True)))
        self.assertGreaterEqual(logical, 1)
        #
        if os.path.exists("/proc/cpuinfo"):
            with open("/proc/cpuinfo") as fd:
                cpuinfo_data = fd.read()
            if "physical id" not in cpuinfo_data:
                raise unittest.SkipTest("cpuinfo doesn't include physical id")
        physical = psutil.cpu_count(logical=False)
        self.assertGreaterEqual(physical, 1)
        self.assertGreaterEqual(logical, physical)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_times(self):
        # Check type, value >= 0, str().
        total = 0
        times = psutil.cpu_times()
        sum(times)
        for cp_time in times:
            self.assertIsInstance(cp_time, float)
            self.assertGreaterEqual(cp_time, 0.0)
            total += cp_time
        self.assertEqual(total, sum(times))
        str(times)
        # CPU times are always supposed to increase over time
        # or at least remain the same and that's because time
        # cannot go backwards.
        # Surprisingly sometimes this might not be the case (at
        # least on Windows and Linux), see:
        # https://github.com/giampaolo/psutil/issues/392
        # https://github.com/giampaolo/psutil/issues/645
        # if not WINDOWS:
        #     last = psutil.cpu_times()
        #     for x in range(100):
        #         new = psutil.cpu_times()
        #         for field in new._fields:
        #             new_t = getattr(new, field)
        #             last_t = getattr(last, field)
        #             self.assertGreaterEqual(new_t, last_t,
        #                                     msg="%s %s" % (new_t, last_t))
        #         last = new
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_per_cpu_times(self):
        # Check type, value >= 0, str().
        for times in psutil.cpu_times(percpu=True):
            total = 0
            sum(times)
            for cp_time in times:
                self.assertIsInstance(cp_time, float)
                self.assertGreaterEqual(cp_time, 0.0)
                total += cp_time
            self.assertEqual(total, sum(times))
            str(times)
        self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
                         len(psutil.cpu_times(percpu=False)))

        # Note: in theory CPU times are always supposed to increase over
        # time or remain the same but never go backwards. In practice
        # sometimes this is not the case.
        # This issue seemd to be afflict Windows:
        # https://github.com/giampaolo/psutil/issues/392
        # ...but it turns out also Linux (rarely) behaves the same.
        # last = psutil.cpu_times(percpu=True)
        # for x in range(100):
        #     new = psutil.cpu_times(percpu=True)
        #     for index in range(len(new)):
        #         newcpu = new[index]
        #         lastcpu = last[index]
        #         for field in newcpu._fields:
        #             new_t = getattr(newcpu, field)
        #             last_t = getattr(lastcpu, field)
        #             self.assertGreaterEqual(
        #                 new_t, last_t, msg="%s %s" % (lastcpu, newcpu))
        #     last = new
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_per_cpu_times_2(self):
        # Simulate some work load then make sure time have increased
        # between calls.
        tot1 = psutil.cpu_times(percpu=True)
        stop_at = time.time() + 0.1
        while True:
            if time.time() >= stop_at:
                break
        tot2 = psutil.cpu_times(percpu=True)
        for t1, t2 in zip(tot1, tot2):
            t1, t2 = sum(t1), sum(t2)
            difference = t2 - t1
            if difference >= 0.05:
                return
        self.fail()
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_cpu_times_comparison(self):
        # Make sure the sum of all per cpu times is almost equal to
        # base "one cpu" times.
        base = psutil.cpu_times()
        per_cpu = psutil.cpu_times(percpu=True)
        summed_values = base._make([sum(num) for num in zip(*per_cpu)])
        for field in base._fields:
            self.assertAlmostEqual(
                getattr(base, field), getattr(summed_values, field), delta=1)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_per_cpu_times_percent_negative(self):
        # see: https://github.com/giampaolo/psutil/issues/645
        psutil.cpu_times_percent(percpu=True)
        zero_times = [x._make([0 for x in range(len(x._fields))])
                      for x in psutil.cpu_times(percpu=True)]
        with mock.patch('psutil.cpu_times', return_value=zero_times):
            for cpu in psutil.cpu_times_percent(percpu=True):
                for percent in cpu:
                    self._test_cpu_percent(percent, None, None)
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def cpu_times(pid):
            return psutil.Process(pid).cpu_times()
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def times():
                return psutil.cpu_times()
项目:Cayenne-Agent    作者:myDevicesIoT    | 项目源码 | 文件源码
def get_cpu_usage(self):
        """Return dict with overall CPU usage"""
        usage = {}
        try:
            fields = ('user', 'system', 'idle', 'nice', 'iowait', 'irq', 'softirq', 'steal')
            usage = {key: value for key, value in psutil.cpu_times()._asdict().items() if key in fields}
            usage['total'] = round(sum(usage.values()), 2)
        except:
            exception('Error getting CPU usage info')
        return usage
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def _get_cpu_stats(self):
        cpu_info = {'processors': ps.cpu_count(), 'times': ps.cpu_times(),
                    'load': ps.cpu_percent(percpu=True)}
        return cpu_info
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def cpu_times():
    c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.cpu')
    while True:
        cpu_t = psutil.cpu_times()
        c.gauge('system_wide.times.user', cpu_t.user)
        c.gauge('system_wide.times.nice', cpu_t.nice)
        c.gauge('system_wide.times.system', cpu_t.system)
        c.gauge('system_wide.times.idle', cpu_t.idle)
        c.gauge('system_wide.times.iowait', cpu_t.iowait)
        c.gauge('system_wide.times.irq', cpu_t.irq)
        c.gauge('system_wide.times.softirq', cpu_t.softirq)
        c.gauge('system_wide.times.steal', cpu_t.steal)
        c.gauge('system_wide.times.guest', cpu_t.guest)
        c.gauge('system_wide.times.guest_nice', cpu_t.guest_nice)
        time.sleep(GRANULARITY)
项目:nova-lxd    作者:openstack    | 项目源码 | 文件源码
def get_host_cpu_stats(self):
        return {
            'kernel': int(psutil.cpu_times()[2]),
            'idle': int(psutil.cpu_times()[3]),
            'user': int(psutil.cpu_times()[0]),
            'iowait': int(psutil.cpu_times()[4]),
            'frequency': _get_cpu_info().get('cpu mhz', 0)
        }
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def getMethods(self):
        methods = [
            "cpu_percent",
            "cpu_times"
            ]
        return ServiceBase.getMethods() + methods
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def cpu_times(self, percpu=False):
        return psutil.cpu_times(percpu)
项目:mdcs    作者:CtrlC-Root    | 项目源码 | 文件源码
def read_cpu_usage(self):
        return list(map(
            lambda d: {'user': d.user, 'system': d.system, 'idle': d.idle},
            psutil.cpu_times(percpu=True)))
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def check(self):
        data = {}
        # 20160725 windows???load??
        if platform.system() != 'Windows':
            load = os.getloadavg()
            data.update({'load.1': load[0], 'load.5': load[1], 'load.15': load[2]})
        data.update({"cpu.used_total": int(psutil.cpu_percent())})
        # ????CPU????
        per_cpu = psutil.cpu_percent(percpu=True)
        # ????CPU0???
        data.update({'cpu.cpu{0}_used'.format(i): int(val) for i,val in enumerate(per_cpu)})

        # ??CPU???
        new_cpu_times = psutil.cpu_times()
        if self.last_cpu_times is not None:
            last_total_time = reduce(lambda s,x:s+x, self.last_cpu_times)
            now_total_time = reduce(lambda s,x:s+x, new_cpu_times)
            total_time = now_total_time - last_total_time
            data['cpu.used_sy'] = self._get_cpu_time('system', total_time, new_cpu_times)
            data['cpu.used_us'] = self._get_cpu_time('user', total_time, new_cpu_times)
            data['cpu.used_wa'] = self._get_cpu_time('iowait', total_time, new_cpu_times)
            # data['cpu.used_id'] = self._get_cpu_time('idle', total_time, new_cpu_times)
            # data['cpu.used_ni'] = self._get_cpu_time('nice', total_time, new_cpu_times)
            # data['cpu.used_hi'] = self._get_cpu_time('irq', total_time, new_cpu_times)
            # data['cpu.used_si'] = self._get_cpu_time('softirq', total_time, new_cpu_times)
            # data['cpu.used_st'] = self._get_cpu_time('steal', total_time, new_cpu_times)
        else:# ?????
            self.last_cpu_times = new_cpu_times
            gevent.sleep(0.1)
            new_cpu_times = psutil.cpu_times()
            last_total_time = reduce(lambda s,x:s+x, self.last_cpu_times)
            now_total_time = reduce(lambda s,x:s+x, new_cpu_times)
            total_time = now_total_time - last_total_time
            data['cpu.used_sy'] = self._get_cpu_time('system', total_time, new_cpu_times)
            data['cpu.used_us'] = self._get_cpu_time('user', total_time, new_cpu_times)
            data['cpu.used_wa'] = self._get_cpu_time('iowait', total_time, new_cpu_times)

        self.last_cpu_times = new_cpu_times
        return data
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def get_other_iostat(self):
        curr_stat = psutil.disk_io_counters(True)
        curr_cpu_time = self.sum_cpu_time(psutil.cpu_times()) / self.cpu_count
        if self.last_cpu_time == 0: #???
            self.last_stat = curr_stat
            self.last_cpu_time = curr_cpu_time
            return {}, 0
        data_per_disk = {k: 0 for k in self.metric_define}
        count = 0
        ts = curr_cpu_time - self.last_cpu_time
        for disk, nval in curr_stat.iteritems():
            oval = self.last_stat.get(disk)# ?????
            if not oval:
                continue
            total_time = nval.write_time - oval.write_time + nval.read_time - oval.read_time
            total_count = nval.write_count - oval.write_count + nval.read_count - oval.read_count
            if not total_count: # ?????IO????????
                continue
            data_per_disk['io.w_s'] += (nval.write_count - oval.write_count) / ts
            data_per_disk['io.wkbyte_s'] += (nval.write_bytes - oval.write_bytes) / 1024 / ts
            data_per_disk['io.r_s'] += (nval.read_count - oval.read_count) / ts
            data_per_disk['io.rkbyte_s'] += (nval.read_bytes - oval.read_bytes) / 1024 / ts
            data_per_disk['io.await'] += total_time / total_count if total_count else 0.0
            if hasattr(oval, 'busy_time'):# linux?psutil==4.0.0??busy_time
                data_per_disk['io.svctm'] += (nval.busy_time - oval.busy_time) / total_count if total_count else 0.0
                io_util = (nval.busy_time - oval.busy_time) * 100.0 / (ts*1000)
                if io_util > data_per_disk['io.util']:# ?????
                    data_per_disk['io.util'] = io_util if io_util < 100 else 100
                data_per_disk['io.queue_time_percent'] = (data_per_disk['io.await'] - data_per_disk['io.svctm']) * 100 / data_per_disk['io.await'] if data_per_disk['io.await'] else 0
            count += 1

        self.last_stat = curr_stat
        self.last_cpu_time = curr_cpu_time
        return data_per_disk, count
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cpu_times(self):
        self.execute(self.proc.cpu_times)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cpu_times(self):
        self.execute(psutil.cpu_times)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_per_cpu_times(self):
        self.execute(psutil.cpu_times, percpu=True)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cpu_times(self):
        times = psutil.Process().cpu_times()
        assert (times.user > 0.0) or (times.system > 0.0), times
        assert (times.children_user >= 0.0), times
        assert (times.children_system >= 0.0), times
        # make sure returned values can be pretty printed with strftime
        for name in times._fields:
            time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))

    # Test Process.cpu_times() against os.times()
    # os.times() is broken on Python 2.6
    # http://bugs.python.org/issue1040026
    # XXX fails on OSX: not sure if it's for os.times(). We should
    # try this with Python 2.7 and re-enable the test.
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cpu_times_2(self):
        user_time, kernel_time = psutil.Process().cpu_times()[:2]
        utime, ktime = os.times()[:2]

        # Use os.times()[:2] as base values to compare our results
        # using a tolerance  of +/- 0.1 seconds.
        # It will fail if the difference between the values is > 0.1s.
        if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
            self.fail("expected: %s, found: %s" % (utime, user_time))

        if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
            self.fail("expected: %s, found: %s" % (ktime, kernel_time))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_threads_2(self):
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        if OPENBSD:
            try:
                p.threads()
            except psutil.AccessDenied:
                raise unittest.SkipTest(
                    "on OpenBSD this requires root access")
        self.assertAlmostEqual(
            p.cpu_times().user,
            sum([x.user_time for x in p.threads()]), delta=0.1)
        self.assertAlmostEqual(
            p.cpu_times().system,
            sum([x.system_time for x in p.threads()]), delta=0.1)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def cpu_times(self, ret, proc):
        self.assertTrue(ret.user >= 0)
        self.assertTrue(ret.system >= 0)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cpu_count(self):
        logical = psutil.cpu_count()
        self.assertEqual(logical, len(psutil.cpu_times(percpu=True)))
        self.assertGreaterEqual(logical, 1)
        #
        if os.path.exists("/proc/cpuinfo"):
            with open("/proc/cpuinfo") as fd:
                cpuinfo_data = fd.read()
            if "physical id" not in cpuinfo_data:
                raise unittest.SkipTest("cpuinfo doesn't include physical id")
        physical = psutil.cpu_count(logical=False)
        self.assertGreaterEqual(physical, 1)
        self.assertGreaterEqual(logical, physical)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cpu_times(self):
        # Check type, value >= 0, str().
        total = 0
        times = psutil.cpu_times()
        sum(times)
        for cp_time in times:
            self.assertIsInstance(cp_time, float)
            self.assertGreaterEqual(cp_time, 0.0)
            total += cp_time
        self.assertEqual(total, sum(times))
        str(times)
        # CPU times are always supposed to increase over time
        # or at least remain the same and that's because time
        # cannot go backwards.
        # Surprisingly sometimes this might not be the case (at
        # least on Windows and Linux), see:
        # https://github.com/giampaolo/psutil/issues/392
        # https://github.com/giampaolo/psutil/issues/645
        # if not WINDOWS:
        #     last = psutil.cpu_times()
        #     for x in range(100):
        #         new = psutil.cpu_times()
        #         for field in new._fields:
        #             new_t = getattr(new, field)
        #             last_t = getattr(last, field)
        #             self.assertGreaterEqual(new_t, last_t,
        #                                     msg="%s %s" % (new_t, last_t))
        #         last = new
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_per_cpu_times(self):
        # Check type, value >= 0, str().
        for times in psutil.cpu_times(percpu=True):
            total = 0
            sum(times)
            for cp_time in times:
                self.assertIsInstance(cp_time, float)
                self.assertGreaterEqual(cp_time, 0.0)
                total += cp_time
            self.assertEqual(total, sum(times))
            str(times)
        self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
                         len(psutil.cpu_times(percpu=False)))

        # Note: in theory CPU times are always supposed to increase over
        # time or remain the same but never go backwards. In practice
        # sometimes this is not the case.
        # This issue seemd to be afflict Windows:
        # https://github.com/giampaolo/psutil/issues/392
        # ...but it turns out also Linux (rarely) behaves the same.
        # last = psutil.cpu_times(percpu=True)
        # for x in range(100):
        #     new = psutil.cpu_times(percpu=True)
        #     for index in range(len(new)):
        #         newcpu = new[index]
        #         lastcpu = last[index]
        #         for field in newcpu._fields:
        #             new_t = getattr(newcpu, field)
        #             last_t = getattr(lastcpu, field)
        #             self.assertGreaterEqual(
        #                 new_t, last_t, msg="%s %s" % (lastcpu, newcpu))
        #     last = new
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_per_cpu_times_2(self):
        # Simulate some work load then make sure time have increased
        # between calls.
        tot1 = psutil.cpu_times(percpu=True)
        stop_at = time.time() + 0.1
        while True:
            if time.time() >= stop_at:
                break
        tot2 = psutil.cpu_times(percpu=True)
        for t1, t2 in zip(tot1, tot2):
            t1, t2 = sum(t1), sum(t2)
            difference = t2 - t1
            if difference >= 0.05:
                return
        self.fail()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cpu_times_comparison(self):
        # Make sure the sum of all per cpu times is almost equal to
        # base "one cpu" times.
        base = psutil.cpu_times()
        per_cpu = psutil.cpu_times(percpu=True)
        summed_values = base._make([sum(num) for num in zip(*per_cpu)])
        for field in base._fields:
            self.assertAlmostEqual(
                getattr(base, field), getattr(summed_values, field), delta=1)