我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.cpu_percent()。
def tell_system_status(self): import psutil import platform import datetime os, name, version, _, _, _ = platform.uname() version = version.split('-')[0] cores = psutil.cpu_count() cpu_percent = psutil.cpu_percent() memory_percent = psutil.virtual_memory()[2] disk_percent = psutil.disk_usage('/')[3] boot_time = datetime.datetime.fromtimestamp(psutil.boot_time()) running_since = boot_time.strftime("%A %d. %B %Y") response = "I am currently running on %s version %s. " % (os, version) response += "This system is named %s and has %s CPU cores. " % (name, cores) response += "Current disk_percent is %s percent. " % disk_percent response += "Current CPU utilization is %s percent. " % cpu_percent response += "Current memory utilization is %s percent. " % memory_percent response += "it's running since %s." % running_since return response
def pids_active(pids_computer): """ This function find pids of computer and return the valid. """ pid_valid = {} for pid in pids_computer: data = None try: process = psutil.Process(pid) data = {"pid": process.pid, "status": process.status(), "percent_cpu_used": process.cpu_percent(interval=0.0), "percent_memory_used": process.memory_percent()} except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess): data = None if data is not None: pid_valid[process.name()] = data return pid_valid
def stream_host_stats(): while True: net = psutil.net_io_counters(pernic=True) time.sleep(1) net1 = psutil.net_io_counters(pernic=True) net_stat_download = {} net_stat_upload = {} for k, v in net.items(): for k1, v1 in net1.items(): if k1 == k: net_stat_download[k] = (v1.bytes_recv - v.bytes_recv) / 1000. net_stat_upload[k] = (v1.bytes_sent - v.bytes_sent) / 1000. ds = statvfs('/') disk_str = {"Used": ((ds.f_blocks - ds.f_bfree) * ds.f_frsize) / 10 ** 9, "Unused": (ds.f_bavail * ds.f_frsize) / 10 ** 9} yield '[{"cpu":"%s","memory":"%s","memTotal":"%s","net_stats_down":"%s","net_stats_up":"%s","disk":"%s"}],' \ % (psutil.cpu_percent(interval=1), psutil.virtual_memory().used, psutil.virtual_memory().free, \ net_stat_download, net_stat_upload, disk_str)
def doit(channel): cpu_pc = psutil.cpu_percent() mem_avail = psutil.virtual_memory().percent try: response = channel.update({1:cpu_pc, 2:mem_avail}) print cpu_pc print mem_avail_mb print strftime("%a, %d %b %Y %H:%M:%S", localtime()) print response except: print "connection failed" #sleep for 16 seconds (api limit of 15 secs)
def __init__(self): super().__init__() # From the psutil documentation https://pythonhosted.org/psutil/#psutil.cpu_percent: # # Warning the first time this function is called # with interval = 0.0 or None it will return a # meaningless 0.0 value which you are supposed # to ignore. psutil.cpu_percent() # '...is recommended for accuracy that this function be called with at least 0.1 seconds between calls.' time.sleep(0.1) # a sliding window of 60 contiguous 5 sec utilization (up to five minutes) self.cpuutils = collections.deque([psutil.cpu_percent()], maxlen=60) self.system_virtual_memory = psutil.virtual_memory() logger.debug('System Utilization handler initialized.')
def cpu_load(): count = psutil.cpu_count() condition_cpu_loop = True mem = Memory.Free_Space() if mem < 200: print "Achtung, sehr wenig Speicher frei!" while (condition_cpu_loop == True): cpu_load = psutil.cpu_percent(interval=cpu_interval) print(cpu_load) cpu_load_finish = cpu_load if(cpu_load > cpu_load_warning): condition_cpu_loop = False print("Warning Warning") print Pids.Pi(count, cpu_load_finish) return(cpu_load)
def get_stats(): root.authorized() params = {} # number of jobs in queued, running, and completed states params['nq'] = db(jobs.state=='Q').count() params['nr'] = db(jobs.state=='R').count() params['nc'] = db(jobs.state=='C').count() params['cpu'] = psutil.cpu_percent() params['vm'] = psutil.virtual_memory().percent params['disk'] = psutil.disk_usage('/').percent params['cid'] = request.query.cid params['app'] = request.query.app return template("stats", params)
def sysInfoRaw(): import time import psutil cpuPercent = psutil.cpu_percent(interval=1) memPercent = psutil.virtual_memory().percent sda2Percent = psutil.disk_usage('/').percent sda3Percent = psutil.disk_usage('/home').percent seconds = int(time.time()) - int(psutil.boot_time()) m, s = divmod(seconds, 60) h, m = divmod(m, 60) d, h = divmod(h, 24) uptime = [d, h, m, s] return [cpuPercent, memPercent, sda2Percent, sda3Percent, uptime]
def free_cpus(): """ This function ... :return: """ # Get the total number of processors on this system total = multiprocessing.cpu_count() # Get the load of the different processors load = np.array(psutil.cpu_percent(percpu=True))/100.0 # Calculate the the number of full processors (where 2 processors with loads x and y contribute as a processor with load x+y) full = np.sum(load) # Get the number of free processors free = total - full return free # -----------------------------------------------------------------
def wait_for_idle(self, timeout=30): """Wait for the system to go idle for at least 2 seconds""" import monotonic import psutil logging.debug("Waiting for Idle...") cpu_count = psutil.cpu_count() if cpu_count > 0: target_pct = 50. / float(cpu_count) idle_start = None end_time = monotonic.monotonic() + timeout idle = False while not idle and monotonic.monotonic() < end_time: self.alive() check_start = monotonic.monotonic() pct = psutil.cpu_percent(interval=0.5) if pct <= target_pct: if idle_start is None: idle_start = check_start if monotonic.monotonic() - idle_start > 2: idle = True else: idle_start = None
def wait_for_idle(self): """Wait for no more than 50% of a single core used for 500ms""" import psutil logging.debug("Waiting for Idle...") cpu_count = psutil.cpu_count() if cpu_count > 0: target_pct = 50. / float(cpu_count) idle_start = None end_time = monotonic.monotonic() + self.START_BROWSER_TIME_LIMIT idle = False while not idle and monotonic.monotonic() < end_time: check_start = monotonic.monotonic() pct = psutil.cpu_percent(interval=0.1) if pct <= target_pct: if idle_start is None: idle_start = check_start if monotonic.monotonic() - idle_start >= 0.4: idle = True else: idle_start = None
def background_thread(self): """Background thread for monitoring CPU and bandwidth usage""" import psutil last_time = start_time = monotonic.monotonic() last_bytes = self.get_net_bytes() snapshot = {'time': 0, 'cpu': 0.0, 'bw': 0} self.usage_queue.put(snapshot) while self.recording: snapshot = {'bw': 0} snapshot['cpu'] = psutil.cpu_percent(interval=0.1) now = monotonic.monotonic() snapshot['time'] = int((now - start_time) * 1000) # calculate the bandwidth over the last interval in Kbps bytes_in = self.get_net_bytes() if now > last_time: snapshot['bw'] = int((bytes_in - last_bytes) * 8.0 / (now - last_time)) last_time = now last_bytes = bytes_in self.usage_queue.put(snapshot)
def __init__(self, j=os.cpu_count(), max_utilization=100): """ Initialize a parallel loop object. :param j: The maximum number of parallel jobs. :param max_utilization: The maximum CPU utilization. Above this no more new jobs will be started. """ self._j = j self._max_utilization = max_utilization # This gets initialized to 0, may be set to 1 anytime, but must not be reset to 0 ever; # thus, no locking is needed when accessing self._break = multiprocessing.sharedctypes.Value('i', 0, lock=False) self._lock = multiprocessing.Condition() self._slots = multiprocessing.sharedctypes.Array('i', j, lock=False) psutil.cpu_percent(None) # Beware! this is running in a new process now. state is shared with fork, # but only changes to shared objects will be visible in parent.
def _recorder(pid, stop, ival): """Subprocess call function to record cpu and memory.""" t = t0 = time() process = psutil.Process(pid) if stop is None: while True: m = process.memory_info() print(psutil.cpu_percent(), ',', m[0], ',', m[1]) sleep(ival) t = time() else: while t - t0 < stop: m = process.memory_info() print(psutil.cpu_percent(), ',', m[0], ',', m[1]) sleep(ival) t = time()
def cpu_times_percent(): c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.cpu') while True: value = psutil.cpu_percent(interval=1) c.gauge('system_wide.percent', value) cpu_t_percent = psutil.cpu_times_percent(interval=1) c.gauge('system_wide.times_percent.user', cpu_t_percent.user) c.gauge('system_wide.times_percent.nice', cpu_t_percent.nice) c.gauge('system_wide.times_percent.system', cpu_t_percent.system) c.gauge('system_wide.times_percent.idle', cpu_t_percent.idle) c.gauge('system_wide.times_percent.iowait', cpu_t_percent.iowait) c.gauge('system_wide.times_percent.irq', cpu_t_percent.irq) c.gauge('system_wide.times_percent.softirq', cpu_t_percent.softirq) c.gauge('system_wide.times_percent.steal', cpu_t_percent.steal) c.gauge('system_wide.times_percent.guest', cpu_t_percent.guest) c.gauge('system_wide.times_percent.guest_nice', cpu_t_percent.guest_nice) time.sleep(GRANULARITY)
def POST(self): cpuused = psutil.cpu_percent(interval=None, percpu=False) memused = psutil.virtual_memory()[2] diskused = psutil.disk_usage('/')[3] pidcount = len(psutil.pids()) uptimeshell = subprocess.Popen(['uptime'], stderr=subprocess.PIPE,stdout=subprocess.PIPE) uptimeshell.wait() uptime = uptimeshell.communicate() return json.dumps( { "code": 0, "current": { "cpuused": cpuused, "memused": memused, "diskused": diskused, "pidcount": pidcount, "uptime": uptime } } )
def cpuinfo(): men=psutil.virtual_memory() menab=(men.available)/(1024*1024) menta=(men.total)/(1024*1024) menus=(men.used)/(1024*1024) disk=psutil.disk_usage('/') diskta=(disk.total)/(1024*1024*1024) diskus=(disk.used)/(1024*1024*1024) diskfr=(disk.free)/(1024*1024*1024) time = datetime.datetime.now() with open('eggs.csv', 'a') as csvfile: spamwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) spamwriter.writerow({"??????%s"%time}) spamwriter.writerow({"cpu?????%d%s"%(psutil.cpu_percent(interval=1),"%")}) spamwriter.writerow({"???????%sMB" % int(menta)}) spamwriter.writerow({"?????????%sMB" % int(menus)}) spamwriter.writerow({"????????%sMB" % int(menab)}) spamwriter.writerow({"???????%sG" % int(diskta)}) spamwriter.writerow({"???????%sG" % int(diskus)}) spamwriter.writerow({"???????%sG" % int(diskfr)}) with open('eggs.csv', 'r') as csvfile: spamreader = csv.reader(csvfile, delimiter=' ', quotechar='|') for row in spamreader: print(row)
def monitor(arglar): logger.info("Starting main loop") while True: if arglar.mode in "ca": cpu_reading = max(psutil.cpu_percent(arglar.resolution, percpu=True)) logger.debug("Cpu percents {}".format(cpu_reading)) if cpu_reading > arglar.cpu_crit: logger.debug("Cpu percentage is higher than critical level") print("CPU usage is high", file=arglar.crit_out) elif arglar.cpu_warn is not None and cpu_reading > arglar.cpu_warn: logger.debug("Cpu percentage is higher than warning level") print("CPU usage is critical", file=arglar.warn_out) if arglar.mode in "ra": ram_reading = psutil.virtual_memory().percent logger.debug("Ram percents {}".format(ram_reading)) if ram_reading > arglar.ram_crit: logger.debug("Ram percentage is higher than critical level") print("RAM usage is high", file=arglar.crit_out) elif arglar.ram_warn is not None and ram_reading > arglar.ram_warn: logger.debug("Ram percentage is higher than warning level") print("RAM usage is critical", file=arglar.warn_out) logger.debug("Sleeping") time.sleep(arglar.resolution)
def __init__(self, architecture=None, cores_per_socket=None, cpu_family=None, cpu_op_mode=None, model=None, model_name=None, vendor_id=None, cpu_util_percent=None, *args, **kwargs): super(Cpu, self).__init__(*args, **kwargs) cpu = self._getNodeCpu() self.architecture = architecture or cpu['Architecture'] self.cores_per_socket = cores_per_socket or cpu["CoresPerSocket"] self.cpu_family = cpu_family or cpu["CPUFamily"] self.cpu_op_mode = cpu_op_mode or cpu["CpuOpMode"] self.model = model or cpu["Model"] self.model_name = model_name or cpu["ModelName"] self.vendor_id = vendor_id or cpu["VendorId"] self.cpu_util_percent = cpu_util_percent or str(psutil.cpu_percent()) self.value = 'nodes/{0}/Cpu'
def updateGui(screen, boxes, data): maxy, maxx = screen.getmaxyx() date = str(time.strftime("%c")) screen.addstr(1, maxx - len(date) - 2, date) screen.addstr(3, 15, '%3d' % (psutil.cpu_percent())) # svmem(total=10367352832, available=6472179712, percent=37.6, used=8186245120, free=2181107712, active=4748992512, inactive=2758115328, buffers=790724608, cached=3500347392, shared=787554304) screen.addstr(4, 15, str(psutil.virtual_memory()[4] / (1024 * 1024))) screen.refresh() n = 0 for box in boxes: testspersecond = '%8d' % data[n]["testspersecond"] testcount = '%8d' % data[n]["testcount"] crashcount = '%8d' % data[n]["crashcount"] box.addstr(1, 1, testspersecond) box.addstr(2, 1, testcount) box.addstr(3, 1, crashcount) box.refresh() n += 1
def status(cmd, message, args): os_icon, os_color = get_os_icon() general_text = f'Latency: **{int(cmd.bot.latency * 1000)}ms**' general_text += f'\nPlatform: **{sys.platform.upper()}**' general_text += f'\nStarted: **{arrow.get(psutil.boot_time()).humanize()}**' cpu_clock = psutil.cpu_freq() if cpu_clock: cpu_clock = cpu_clock.current else: cpu_clock = 'Unknown' cpu_text = f'Count: **{psutil.cpu_count()} ({psutil.cpu_count(logical=False)})**' cpu_text += f'\nUsage: **{psutil.cpu_percent()}%**' cpu_text += f'\nClock: **{cpu_clock} MHz**' used_mem = humanfriendly.format_size(psutil.virtual_memory().available, binary=True) total_mem = humanfriendly.format_size(psutil.virtual_memory().total, binary=True) mem_text = f'Used: **{used_mem}**' mem_text += f'\nTotal: **{total_mem}**' mem_text += f'\nPercent: **{int(100 - psutil.virtual_memory().percent)}%**' response = discord.Embed(color=os_color) response.set_author(name=socket.gethostname(), icon_url=os_icon) response.add_field(name='General', value=general_text) response.add_field(name='CPU', value=cpu_text) response.add_field(name='Memory', value=mem_text) await message.channel.send(embed=response)
def monitor(frist_invoke=2): cpu = psutil.cpu_times_percent(interval=frist_invoke, percpu=False) cpu_percent = psutil.cpu_percent(interval=frist_invoke) value_dic = { 'cpu':{ 'cpu.user': cpu.user, 'cpu.nice': cpu.nice, 'cpu.system':cpu.system, 'cpu.idle':cpu.idle, 'cpu.iowait': cpu.iowait, 'cpu.irq': cpu.irq, 'cpu.softirq': cpu.softirq, 'cpu.steal': cpu.steal, 'cpu.guest': cpu.guest, 'cpu.percent': cpu_percent } } return value_dic
def poll(): global lastChecksum, voltage if socketio != None: mempercent = psutil.virtual_memory().percent cpupercent = psutil.cpu_percent(interval=None) cputemp = get_cpu_temperature() secs = int(round(time.time())) # every 10 seconds update voltage (or if it is zero) if(secs % 10 == 0 or voltage == 0): v = robot.getVoltage() # sometimes getVoltage fails and returns 0...don't show it..wait for next read if(v != 0): voltage = v data = {'mp': mempercent, 'cp': cpupercent, 'ct': cputemp, 'v': voltage} checksum = hashlib.sha224(json.dumps(data)).hexdigest() if lastChecksum != checksum: socketio.emit('sysinfo', data) lastChecksum = checksum time.sleep(0.5)
def update(self): """Function to update the entire class information.""" self.cpu["percentage"] = psutil.cpu_percent(interval=0.7) self.boot = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime( "%Y-%m-%d %H:%M:%S") virtual_memory = psutil.virtual_memory() self.memory["used"] = virtual_memory.used self.memory["free"] = virtual_memory.free self.memory["cached"] = virtual_memory.cached net_io_counters = psutil.net_io_counters() self.network["packet_sent"] = net_io_counters.packets_sent self.network["packet_recv"] = net_io_counters.packets_recv disk_usage = psutil.disk_usage('/') self.disk["total"] = int(disk_usage.total/1024) self.disk["used"] = int(disk_usage.used/1024) self.disk["free"] = int(disk_usage.free/1024) self.timestamp = time.time()
def sysinfo(self, ctx): """Several interesting informations about your Host System.""" process = psutil.Process(os.getpid()) memory_usage = process.memory_full_info().uss / 1024**2 avai = psutil.virtual_memory().total / 1024**2 mepro = process.memory_percent() prosys = psutil.cpu_percent() sys = '%s %s' % (platform.linux_distribution(full_distribution_name=1)[0].title(), platform.linux_distribution(full_distribution_name=1)[1]) embed = discord.Embed(title='\N{ELECTRIC LIGHT BULB} Host Info', colour=embedColor(self)) embed.add_field(name='\N{CLOCK FACE THREE OCLOCK} UPTIME', value=getTimeDiff(datetime.datetime.fromtimestamp(int(process.create_time())), datetime.datetime.now())) embed.add_field(name='\N{DESKTOP COMPUTER} SYSTEM', value='{0}, {1}'.format(platform.system(), sys, platform.release())) embed.add_field(name='\N{FLOPPY DISK} MEMORY', value='{:.2f} MiB / {:.2f} MiB\nBot uses: {:.2f}%'.format(memory_usage, avai, mepro)) embed.add_field(name='\N{DVD} CPU', value='{:.2f}%'.format(prosys)) await edit(ctx, embed=embed, ttl=20) # Change Gamestatus - blank is no game
def loop_forever(self): try: while 1: try: self.status_changed = False self.phone_home() except Exception as e: self.logger.error('Failed to contact server. %s' % e) if self.need_restart and not self.container.running_jobs and not self.container.finished_jobs: print 'Restarting...' # End current process and launch a new one with the same command line parameters #os.execvp('python.exe', ['python.exe'] + sys.argv) exit(3) if not self.status_changed: self.cpu_percent = psutil.cpu_percent(interval=self.PHONEHOME_DELAY) except KeyboardInterrupt: pass finally: print 'Exiting...' self.notify_exit()
def getLoadAverage(): if linux: import multiprocessing k = 1.0 k /= multiprocessing.cpu_count() if os.path.exists('/proc/loadavg'): return [float(open('/proc/loadavg').read().split()[x]) * k for x in range(3)] else: tokens = subprocess.check_output(['uptime']).split() return [float(x.strip(',')) * k for x in tokens[-3:]] if mswindows: # TODO(Guodong Ding) get this field data like on Linux for Windows # print psutil.cpu_percent() # print psutil.cpu_times_percent() # print psutil.cpu_times() # print psutil.cpu_stats() return "%.2f%%" % psutil.cpu_percent()
def poll(interval): # sleep some time time.sleep(interval) procs = [] procs_status = {} for p in psutil.process_iter(): try: p.dict = p.as_dict(['username', 'nice', 'memory_info', 'memory_percent', 'cpu_percent', 'cpu_times', 'name', 'status']) try: procs_status[p.dict['status']] += 1 except KeyError: procs_status[p.dict['status']] = 1 except psutil.NoSuchProcess: pass else: procs.append(p) # return processes sorted by CPU percent usage processes = sorted(procs, key=lambda p: p.dict['cpu_percent'], reverse=True) return (processes, procs_status)
def captureSystemPerformance(self): logger.debug("{} capturing system performance".format(self)) timestamp = time.time() cpu = psutil.cpu_percent(interval=None) ram = psutil.virtual_memory() curr_network = self.calculateTraffic() network = curr_network - self.lastKnownTraffic self.lastKnownTraffic = curr_network cpu_data = { 'time': timestamp, 'value': cpu } ram_data = { 'time': timestamp, 'value': ram.percent } traffic_data = { 'time': timestamp, 'value': network } return { 'cpu': cpu_data, 'ram': ram_data, 'traffic': traffic_data }
def destroy(self, path): # fusermount -u or SIGINT aka control-C self.lfs_status = FRDnode.SOC_STATUS_OFFLINE self.librarian(self.lcp('update_node_soc_status', status=FRDnode.SOC_STATUS_OFFLINE, cpu_percent=0, rootfs_percent=0, network_in=0, network_out=0, mem_percent=0)) self.librarian(self.lcp('update_node_mc_status', status=FRDFAModule.MC_STATUS_OFFLINE)) assert threading.current_thread() is threading.main_thread() self.torms.close() del self.torms # helpers
def get_cpu_percent(): return psutil.cpu_percent(interval=1)
def init_with_context(self, context): ram = psutil.virtual_memory().percent cpu = psutil.cpu_percent() green, orange, red, grey = '#00FF38', '#FFB400', '#FF3B00', '#EBEBEB' ram_color = green if ram >= 75: ram_color = red elif ram >= 50: ram_color = orange cpu_color = green if cpu >= 75: cpu_color = red elif cpu >= 50: cpu_color = orange self.cpu_idel = 100 - cpu self.cpu_color = cpu_color self.cpu = cpu self.ram = 100 - ram self.ram_used = ram self.ram_color = ram_color
def run(self): while not self._exit_event.is_set(): self._performance_histogram[int(psutil.cpu_percent() + 0.5)] += 1 self._exit_event.wait(1) # periodically poll
def update(self, old_cpu): return psutil.cpu_percent(interval=None)
def run(self): while not self.shutdown_event.is_set(): try: self.update_value = psutil.cpu_percent(interval=self.interval) except Exception as e: self.exception('Exception while calculating cpu_percent: {0}', str(e))
def render(self, cpu_percent, format='{0:.0f}%', **kwargs): if not cpu_percent: return None return [{ 'contents': format.format(cpu_percent), 'gradient_level': cpu_percent, 'highlight_groups': ['cpu_load_percent_gradient', 'cpu_load_percent'], }]
def render(cpu_percent, pl, format='{0:.0f}%', **kwargs): pl.warn('Module “psutil” is not installed, thus CPU load is not available') return None
def cpu(self, type): if type == 'count': return psutil.cpu_count() elif type == 'cpu_percent': return psutil.cpu_percent(interval=1)
def get_cpu(self): return int(psutil.cpu_percent())
def getCpuMemory(self): """??CPU???????""" cpuPercent = psutil.cpu_percent() memoryPercent = psutil.virtual_memory().percent return u'CPU????%d%% ??????%d%%' % (cpuPercent, memoryPercent)
def update_data(self): """ Background task to add CPU Utilization sample / refresh memory utilization. """ cpu_util = psutil.cpu_percent() self.cpuutils.append(cpu_util) self.system_virtual_memory = psutil.virtual_memory() logger.debug('Updating CPU/Mem Utilization with: {}% / {}%'.format(cpu_util, self.get_memutil()))
def get(self): session = Session() service_count = session.query(Service).count() incidents = session.query(Incident).filter() session.close() memory_stats = psutil.virtual_memory() self.render("incidents.html", version=__version__, max_memory=humanize.naturalsize(memory_stats.total), memory_used=humanize.naturalsize(memory_stats.used), service_count=service_count, cpu_current=psutil.cpu_percent(), memory_percent=(memory_stats.used / memory_stats.total) * 100, incidents=incidents)
def get_server_status(self): system_status = { "cpu": psutil.cpu_percent(), "memory": psutil.virtual_memory().percent, } json_obj = [] json_obj.append(system_status) return json_obj
def process_send_data(socket, context): """ Send all memory, cpu, disk, network data of computer to server(master node) """ while True: try: cpu_percent = psutil.cpu_percent(interval=1, percpu=True) memory_info = psutil.virtual_memory() disk_info = psutil.disk_usage('/') pids_computer = psutil.pids() info_to_send = json.dumps({ "computer_utc_clock": str(datetime.datetime.utcnow()), "computer_clock": str(datetime.datetime.now()), "hostname": platform.node(), "mac_address": mac_address(), "ipv4_interfaces": internet_addresses(), "cpu": { "percent_used": cpu_percent }, "memory": { "total_bytes": memory_info.total, "total_bytes_used": memory_info.used, "percent_used": memory_info.percent }, "disk": { "total_bytes": disk_info.total, "total_bytes_used": disk_info.used, "total_bytes_free": disk_info.free, "percent_used": disk_info.percent }, "process": pids_active(pids_computer) }).encode() #send json data in the channel 'status', although is not necessary to send. socket.send_multipart([b"status", info_to_send]) #time.sleep(0.500) except (KeyboardInterrupt, SystemExit): socket.close() context.term()
def get_sys_info(): """Display info on system and output as nice HTML""" spacer = "<tr><td> </td><td> </td></tr>" html = '<h3>System Information for {}</h3>'.format(platform.node()) html += '<table>' html += '<tr><td align="left">Python Executable</td>' html += '<td>{}</td></tr>'.format(sys.executable) html += '<tr><td>Kernel PID</td><td>{}</td></tr>'.format( psutil.Process().pid) mem = psutil.virtual_memory() html += '<tr><td>Total System Memory</td>' html += '<td>{:.4} Mb</td></td>'.format(mem.total/1024**3) html += '<tr><td>Total Memory Used</td>' html += '<td>{:.4} Mb</td></td>'.format(mem.used/1024**3) html += '<tr><td>Total Memory Free</td>' html += '<td>{:.4} Mb</td></td>'.format(mem.free/1024**3) html += '<tr><td>Number of CPU Cores</td><td>{}</td></tr>'.format( psutil.cpu_count()) html += '<tr><td>Current CPU Load</td><td>{} %</td></tr>'.format( psutil.cpu_percent(1, False)) html += '</table>' return HTML(html)
def show_kernels(): """Show all IPython Kernels on System""" total_mem = psutil.virtual_memory().total html = ('<h3>IPython Notebook Processes on {}</h3>' '<table><tr>' '<th>Username</th><th>PID</th><th>CPU Usage</th>' '<th>Process Memory</th><th>System Memory Used</th><th>Status</th>' '</tr>').format(platform.node()) for proc in psutil.process_iter(): try: pinfo = proc.as_dict(attrs=['pid', 'username', 'cmdline', 'memory_info', 'status']) except psutil.NoSuchProcess: pass else: if any(x in pinfo['cmdline'] for x in ['IPython.kernel', 'ipykernel_launcher']): html += '<tr>' html += '<td>{username}</td><td>{pid}</td>'.format(**pinfo) p = psutil.Process(pinfo['pid']).cpu_percent(0.1) html += '<td>{}%</td>'.format(p) html += '<td>{:.4} Mb</td>'.format(pinfo['memory_info'].vms / 1024**3) html += '<td>{:.3}%</td>'.format(100 *pinfo['memory_info'].vms / total_mem) html += '<td>{}</td>'.format(pinfo['status']) html += '</tr>' html += '</table>' return HTML(html)
def run(self): while True: cpu_times, chrome_reset = self.cpu_times() print(json.dumps({ 'time': time.time(), 'cpu_times': cpu_times, 'cpu_percent': psutil.cpu_percent(percpu=True), 'chrome_reset': chrome_reset, }), flush=True) self.chrome_reset = False time.sleep(self.interval)