我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.cpu_times()。
def test_cpu_times(self): fields = psutil.cpu_times()._fields kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0] kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) if kernel_ver_info >= (2, 6, 11): self.assertIn('steal', fields) else: self.assertNotIn('steal', fields) if kernel_ver_info >= (2, 6, 24): self.assertIn('guest', fields) else: self.assertNotIn('guest', fields) if kernel_ver_info >= (3, 2, 0): self.assertIn('guest_nice', fields) else: self.assertNotIn('guest_nice', fields)
def test_serialization(self): def check(ret): if json is not None: json.loads(json.dumps(ret)) a = pickle.dumps(ret) b = pickle.loads(a) self.assertEqual(ret, b) check(psutil.Process().as_dict()) check(psutil.virtual_memory()) check(psutil.swap_memory()) check(psutil.cpu_times()) check(psutil.cpu_times_percent(interval=0)) check(psutil.net_io_counters()) if LINUX and not os.path.exists('/proc/diskstats'): pass else: if not APPVEYOR: check(psutil.disk_io_counters()) check(psutil.disk_partitions()) check(psutil.disk_usage(os.getcwd())) check(psutil.users())
def test_oneshot_twice(self): # Test the case where the ctx manager is __enter__ed twice. # The second __enter__ is supposed to resut in a NOOP. with mock.patch("psutil._psplatform.Process.cpu_times") as m1: with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2: p = psutil.Process() with p.oneshot(): p.cpu_times() p.cpu_times() with p.oneshot(): p.cpu_times() p.cpu_times() self.assertEqual(m1.call_count, 1) self.assertEqual(m2.call_count, 1) with mock.patch("psutil._psplatform.Process.cpu_times") as m: p.cpu_times() p.cpu_times() self.assertEqual(m.call_count, 2)
def test_Popen(self): # XXX this test causes a ResourceWarning on Python 3 because # psutil.__subproc instance doesn't get propertly freed. # Not sure what to do though. cmd = [PYTHON, "-c", "import time; time.sleep(60);"] proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: proc.name() proc.cpu_times() proc.stdin self.assertTrue(dir(proc)) self.assertRaises(AttributeError, getattr, proc, 'foo') finally: proc.kill() proc.wait()
def getLoadAverage(): if linux: import multiprocessing k = 1.0 k /= multiprocessing.cpu_count() if os.path.exists('/proc/loadavg'): return [float(open('/proc/loadavg').read().split()[x]) * k for x in range(3)] else: tokens = subprocess.check_output(['uptime']).split() return [float(x.strip(',')) * k for x in tokens[-3:]] if mswindows: # TODO(Guodong Ding) get this field data like on Linux for Windows # print psutil.cpu_percent() # print psutil.cpu_times_percent() # print psutil.cpu_times() # print psutil.cpu_stats() return "%.2f%%" % psutil.cpu_percent()
def test_cpu_times(self): self.execute(self.proc.cpu_times)
def test_cpu_times(self): self.execute(psutil.cpu_times)
def test_per_cpu_times(self): self.execute(psutil.cpu_times, percpu=True)
def test_cpu_times(self): times = psutil.Process().cpu_times() assert (times.user > 0.0) or (times.system > 0.0), times assert (times.children_user >= 0.0), times assert (times.children_system >= 0.0), times # make sure returned values can be pretty printed with strftime for name in times._fields: time.strftime("%H:%M:%S", time.localtime(getattr(times, name))) # Test Process.cpu_times() against os.times() # os.times() is broken on Python 2.6 # http://bugs.python.org/issue1040026 # XXX fails on OSX: not sure if it's for os.times(). We should # try this with Python 2.7 and re-enable the test.
def test_cpu_times_2(self): user_time, kernel_time = psutil.Process().cpu_times()[:2] utime, ktime = os.times()[:2] # Use os.times()[:2] as base values to compare our results # using a tolerance of +/- 0.1 seconds. # It will fail if the difference between the values is > 0.1s. if (max([user_time, utime]) - min([user_time, utime])) > 0.1: self.fail("expected: %s, found: %s" % (utime, user_time)) if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: self.fail("expected: %s, found: %s" % (ktime, kernel_time))
def test_threads_2(self): sproc = get_test_subprocess() p = psutil.Process(sproc.pid) if OPENBSD: try: p.threads() except psutil.AccessDenied: raise unittest.SkipTest( "on OpenBSD this requires root access") self.assertAlmostEqual( p.cpu_times().user, sum([x.user_time for x in p.threads()]), delta=0.1) self.assertAlmostEqual( p.cpu_times().system, sum([x.system_time for x in p.threads()]), delta=0.1)
def test_cpu_affinity(self): p = psutil.Process() initial = p.cpu_affinity() if hasattr(os, "sched_getaffinity"): self.assertEqual(initial, list(os.sched_getaffinity(p.pid))) self.assertEqual(len(initial), len(set(initial))) all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) # setting on travis doesn't seem to work (always return all # CPUs on get): # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, ... != [0] for n in all_cpus: p.cpu_affinity([n]) self.assertEqual(p.cpu_affinity(), [n]) if hasattr(os, "sched_getaffinity"): self.assertEqual(p.cpu_affinity(), list(os.sched_getaffinity(p.pid))) # p.cpu_affinity(all_cpus) self.assertEqual(p.cpu_affinity(), all_cpus) if hasattr(os, "sched_getaffinity"): self.assertEqual(p.cpu_affinity(), list(os.sched_getaffinity(p.pid))) # self.assertRaises(TypeError, p.cpu_affinity, 1) p.cpu_affinity(initial) # it should work with all iterables, not only lists p.cpu_affinity(set(all_cpus)) p.cpu_affinity(tuple(all_cpus)) invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000)) self.assertRaises(TypeError, p.cpu_affinity, [0, "1"]) self.assertRaises(ValueError, p.cpu_affinity, [0, -1]) # TODO: #595
def cpu_times(self, ret, proc): self.assertTrue(ret.user >= 0) self.assertTrue(ret.system >= 0)
def test_cpu_count(self): logical = psutil.cpu_count() self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) self.assertGreaterEqual(logical, 1) # if os.path.exists("/proc/cpuinfo"): with open("/proc/cpuinfo") as fd: cpuinfo_data = fd.read() if "physical id" not in cpuinfo_data: raise unittest.SkipTest("cpuinfo doesn't include physical id") physical = psutil.cpu_count(logical=False) self.assertGreaterEqual(physical, 1) self.assertGreaterEqual(logical, physical)
def test_cpu_times(self): # Check type, value >= 0, str(). total = 0 times = psutil.cpu_times() sum(times) for cp_time in times: self.assertIsInstance(cp_time, float) self.assertGreaterEqual(cp_time, 0.0) total += cp_time self.assertEqual(total, sum(times)) str(times) # CPU times are always supposed to increase over time # or at least remain the same and that's because time # cannot go backwards. # Surprisingly sometimes this might not be the case (at # least on Windows and Linux), see: # https://github.com/giampaolo/psutil/issues/392 # https://github.com/giampaolo/psutil/issues/645 # if not WINDOWS: # last = psutil.cpu_times() # for x in range(100): # new = psutil.cpu_times() # for field in new._fields: # new_t = getattr(new, field) # last_t = getattr(last, field) # self.assertGreaterEqual(new_t, last_t, # msg="%s %s" % (new_t, last_t)) # last = new
def test_per_cpu_times(self): # Check type, value >= 0, str(). for times in psutil.cpu_times(percpu=True): total = 0 sum(times) for cp_time in times: self.assertIsInstance(cp_time, float) self.assertGreaterEqual(cp_time, 0.0) total += cp_time self.assertEqual(total, sum(times)) str(times) self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), len(psutil.cpu_times(percpu=False))) # Note: in theory CPU times are always supposed to increase over # time or remain the same but never go backwards. In practice # sometimes this is not the case. # This issue seemd to be afflict Windows: # https://github.com/giampaolo/psutil/issues/392 # ...but it turns out also Linux (rarely) behaves the same. # last = psutil.cpu_times(percpu=True) # for x in range(100): # new = psutil.cpu_times(percpu=True) # for index in range(len(new)): # newcpu = new[index] # lastcpu = last[index] # for field in newcpu._fields: # new_t = getattr(newcpu, field) # last_t = getattr(lastcpu, field) # self.assertGreaterEqual( # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) # last = new
def test_per_cpu_times_2(self): # Simulate some work load then make sure time have increased # between calls. tot1 = psutil.cpu_times(percpu=True) stop_at = time.time() + 0.1 while True: if time.time() >= stop_at: break tot2 = psutil.cpu_times(percpu=True) for t1, t2 in zip(tot1, tot2): t1, t2 = sum(t1), sum(t2) difference = t2 - t1 if difference >= 0.05: return self.fail()
def test_cpu_times_comparison(self): # Make sure the sum of all per cpu times is almost equal to # base "one cpu" times. base = psutil.cpu_times() per_cpu = psutil.cpu_times(percpu=True) summed_values = base._make([sum(num) for num in zip(*per_cpu)]) for field in base._fields: self.assertAlmostEqual( getattr(base, field), getattr(summed_values, field), delta=1)
def test_per_cpu_times_percent_negative(self): # see: https://github.com/giampaolo/psutil/issues/645 psutil.cpu_times_percent(percpu=True) zero_times = [x._make([0 for x in range(len(x._fields))]) for x in psutil.cpu_times(percpu=True)] with mock.patch('psutil.cpu_times', return_value=zero_times): for cpu in psutil.cpu_times_percent(percpu=True): for percent in cpu: self._test_cpu_percent(percent, None, None)
def cpu_times(pid): return psutil.Process(pid).cpu_times()
def times(): return psutil.cpu_times()
def get_cpu_usage(self): """Return dict with overall CPU usage""" usage = {} try: fields = ('user', 'system', 'idle', 'nice', 'iowait', 'irq', 'softirq', 'steal') usage = {key: value for key, value in psutil.cpu_times()._asdict().items() if key in fields} usage['total'] = round(sum(usage.values()), 2) except: exception('Error getting CPU usage info') return usage
def _get_cpu_stats(self): cpu_info = {'processors': ps.cpu_count(), 'times': ps.cpu_times(), 'load': ps.cpu_percent(percpu=True)} return cpu_info
def cpu_times(): c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.cpu') while True: cpu_t = psutil.cpu_times() c.gauge('system_wide.times.user', cpu_t.user) c.gauge('system_wide.times.nice', cpu_t.nice) c.gauge('system_wide.times.system', cpu_t.system) c.gauge('system_wide.times.idle', cpu_t.idle) c.gauge('system_wide.times.iowait', cpu_t.iowait) c.gauge('system_wide.times.irq', cpu_t.irq) c.gauge('system_wide.times.softirq', cpu_t.softirq) c.gauge('system_wide.times.steal', cpu_t.steal) c.gauge('system_wide.times.guest', cpu_t.guest) c.gauge('system_wide.times.guest_nice', cpu_t.guest_nice) time.sleep(GRANULARITY)
def get_host_cpu_stats(self): return { 'kernel': int(psutil.cpu_times()[2]), 'idle': int(psutil.cpu_times()[3]), 'user': int(psutil.cpu_times()[0]), 'iowait': int(psutil.cpu_times()[4]), 'frequency': _get_cpu_info().get('cpu mhz', 0) }
def getMethods(self): methods = [ "cpu_percent", "cpu_times" ] return ServiceBase.getMethods() + methods
def cpu_times(self, percpu=False): return psutil.cpu_times(percpu)
def read_cpu_usage(self): return list(map( lambda d: {'user': d.user, 'system': d.system, 'idle': d.idle}, psutil.cpu_times(percpu=True)))
def check(self): data = {} # 20160725 windows???load?? if platform.system() != 'Windows': load = os.getloadavg() data.update({'load.1': load[0], 'load.5': load[1], 'load.15': load[2]}) data.update({"cpu.used_total": int(psutil.cpu_percent())}) # ????CPU???? per_cpu = psutil.cpu_percent(percpu=True) # ????CPU0??? data.update({'cpu.cpu{0}_used'.format(i): int(val) for i,val in enumerate(per_cpu)}) # ??CPU??? new_cpu_times = psutil.cpu_times() if self.last_cpu_times is not None: last_total_time = reduce(lambda s,x:s+x, self.last_cpu_times) now_total_time = reduce(lambda s,x:s+x, new_cpu_times) total_time = now_total_time - last_total_time data['cpu.used_sy'] = self._get_cpu_time('system', total_time, new_cpu_times) data['cpu.used_us'] = self._get_cpu_time('user', total_time, new_cpu_times) data['cpu.used_wa'] = self._get_cpu_time('iowait', total_time, new_cpu_times) # data['cpu.used_id'] = self._get_cpu_time('idle', total_time, new_cpu_times) # data['cpu.used_ni'] = self._get_cpu_time('nice', total_time, new_cpu_times) # data['cpu.used_hi'] = self._get_cpu_time('irq', total_time, new_cpu_times) # data['cpu.used_si'] = self._get_cpu_time('softirq', total_time, new_cpu_times) # data['cpu.used_st'] = self._get_cpu_time('steal', total_time, new_cpu_times) else:# ????? self.last_cpu_times = new_cpu_times gevent.sleep(0.1) new_cpu_times = psutil.cpu_times() last_total_time = reduce(lambda s,x:s+x, self.last_cpu_times) now_total_time = reduce(lambda s,x:s+x, new_cpu_times) total_time = now_total_time - last_total_time data['cpu.used_sy'] = self._get_cpu_time('system', total_time, new_cpu_times) data['cpu.used_us'] = self._get_cpu_time('user', total_time, new_cpu_times) data['cpu.used_wa'] = self._get_cpu_time('iowait', total_time, new_cpu_times) self.last_cpu_times = new_cpu_times return data
def get_other_iostat(self): curr_stat = psutil.disk_io_counters(True) curr_cpu_time = self.sum_cpu_time(psutil.cpu_times()) / self.cpu_count if self.last_cpu_time == 0: #??? self.last_stat = curr_stat self.last_cpu_time = curr_cpu_time return {}, 0 data_per_disk = {k: 0 for k in self.metric_define} count = 0 ts = curr_cpu_time - self.last_cpu_time for disk, nval in curr_stat.iteritems(): oval = self.last_stat.get(disk)# ????? if not oval: continue total_time = nval.write_time - oval.write_time + nval.read_time - oval.read_time total_count = nval.write_count - oval.write_count + nval.read_count - oval.read_count if not total_count: # ?????IO???????? continue data_per_disk['io.w_s'] += (nval.write_count - oval.write_count) / ts data_per_disk['io.wkbyte_s'] += (nval.write_bytes - oval.write_bytes) / 1024 / ts data_per_disk['io.r_s'] += (nval.read_count - oval.read_count) / ts data_per_disk['io.rkbyte_s'] += (nval.read_bytes - oval.read_bytes) / 1024 / ts data_per_disk['io.await'] += total_time / total_count if total_count else 0.0 if hasattr(oval, 'busy_time'):# linux?psutil==4.0.0??busy_time data_per_disk['io.svctm'] += (nval.busy_time - oval.busy_time) / total_count if total_count else 0.0 io_util = (nval.busy_time - oval.busy_time) * 100.0 / (ts*1000) if io_util > data_per_disk['io.util']:# ????? data_per_disk['io.util'] = io_util if io_util < 100 else 100 data_per_disk['io.queue_time_percent'] = (data_per_disk['io.await'] - data_per_disk['io.svctm']) * 100 / data_per_disk['io.await'] if data_per_disk['io.await'] else 0 count += 1 self.last_stat = curr_stat self.last_cpu_time = curr_cpu_time return data_per_disk, count