我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用psutil.cpu_count()。
def tell_system_status(self): import psutil import platform import datetime os, name, version, _, _, _ = platform.uname() version = version.split('-')[0] cores = psutil.cpu_count() cpu_percent = psutil.cpu_percent() memory_percent = psutil.virtual_memory()[2] disk_percent = psutil.disk_usage('/')[3] boot_time = datetime.datetime.fromtimestamp(psutil.boot_time()) running_since = boot_time.strftime("%A %d. %B %Y") response = "I am currently running on %s version %s. " % (os, version) response += "This system is named %s and has %s CPU cores. " % (name, cores) response += "Current disk_percent is %s percent. " % disk_percent response += "Current CPU utilization is %s percent. " % cpu_percent response += "Current memory utilization is %s percent. " % memory_percent response += "it's running since %s." % running_since return response
def check(request): return { 'hostname': socket.gethostname(), 'ips': ips, 'cpus': psutil.cpu_count(), 'uptime': timesince(datetime.fromtimestamp(psutil.boot_time())), 'memory': { 'total': filesizeformat(psutil.virtual_memory().total), 'available': filesizeformat(psutil.virtual_memory().available), 'used': filesizeformat(psutil.virtual_memory().used), 'free': filesizeformat(psutil.virtual_memory().free), 'percent': psutil.virtual_memory().percent }, 'swap': { 'total': filesizeformat(psutil.swap_memory().total), 'used': filesizeformat(psutil.swap_memory().used), 'free': filesizeformat(psutil.swap_memory().free), 'percent': psutil.swap_memory().percent } }
def __init__(self, cpu_histogram): super().__init__() # set up the graphical elements layout = QGridLayout(self) self.setLayout(layout) fig = Figure() layout.addWidget(FigureCanvas(fig)) # do the plotting ax = fig.add_subplot(1, 1, 1) # 1x1 grid, first subplot ax.set_title('CPU Usage Histogram (%s Cores/%s Threads)' % (psutil.cpu_count(False), psutil.cpu_count(True))) ax.set_ylabel('Count') ax.set_xlabel('CPU %') ax.grid(True) xs = range(0, 101) ax.plot(xs, [cpu_histogram[x] for x in xs]) ax.xaxis.set_major_locator(MultipleLocator(10.)) self.show()
def cpu_load(): count = psutil.cpu_count() condition_cpu_loop = True mem = Memory.Free_Space() if mem < 200: print "Achtung, sehr wenig Speicher frei!" while (condition_cpu_loop == True): cpu_load = psutil.cpu_percent(interval=cpu_interval) print(cpu_load) cpu_load_finish = cpu_load if(cpu_load > cpu_load_warning): condition_cpu_loop = False print("Warning Warning") print Pids.Pi(count, cpu_load_finish) return(cpu_load)
def __init__(self): self.conn = None self.dirty_scene = False self.guest = None self.guest_mapping_by_uuid = dict() self.hostname = ji.Common.get_hostname() self.node_id = uuid.getnode() self.cpu = psutil.cpu_count() self.memory = psutil.virtual_memory().total self.interfaces = dict() self.disks = dict() self.guest_callbacks = list() self.interval = 60 # self.last_host_cpu_time = dict() self.last_host_traffic = dict() self.last_host_disk_io = dict() self.last_guest_cpu_time = dict() self.last_guest_traffic = dict() self.last_guest_disk_io = dict() self.ts = ji.Common.ts() self.ssh_client = None
def wait_for_idle(self, timeout=30): """Wait for the system to go idle for at least 2 seconds""" import monotonic import psutil logging.debug("Waiting for Idle...") cpu_count = psutil.cpu_count() if cpu_count > 0: target_pct = 50. / float(cpu_count) idle_start = None end_time = monotonic.monotonic() + timeout idle = False while not idle and monotonic.monotonic() < end_time: self.alive() check_start = monotonic.monotonic() pct = psutil.cpu_percent(interval=0.5) if pct <= target_pct: if idle_start is None: idle_start = check_start if monotonic.monotonic() - idle_start > 2: idle = True else: idle_start = None
def wait_for_idle(self): """Wait for no more than 50% of a single core used for 500ms""" import psutil logging.debug("Waiting for Idle...") cpu_count = psutil.cpu_count() if cpu_count > 0: target_pct = 50. / float(cpu_count) idle_start = None end_time = monotonic.monotonic() + self.START_BROWSER_TIME_LIMIT idle = False while not idle and monotonic.monotonic() < end_time: check_start = monotonic.monotonic() pct = psutil.cpu_percent(interval=0.1) if pct <= target_pct: if idle_start is None: idle_start = check_start if monotonic.monotonic() - idle_start >= 0.4: idle = True else: idle_start = None
def get_logical_cpu_count(): if psutil is not None: # Number of logical CPUs cpu_count = psutil.cpu_count() elif hasattr(os, 'cpu_count'): # Python 3.4 cpu_count = os.cpu_count() else: cpu_count = None try: import multiprocessing except ImportError: pass else: try: cpu_count = multiprocessing.cpu_count() except NotImplementedError: pass if cpu_count is not None and cpu_count < 1: return None return cpu_count
def __init__(self, run_config, exe_path, max_cpu_time, max_memory, test_case_id, submission_dir, spj_version, spj_config, output=False): self._run_config = run_config self._exe_path = exe_path self._max_cpu_time = max_cpu_time self._max_memory = max_memory self._max_real_time = self._max_cpu_time * 3 self._test_case_id = test_case_id self._test_case_dir = os.path.join(TEST_CASE_DIR, test_case_id) self._submission_dir = submission_dir self._pool = Pool(processes=psutil.cpu_count()) self._test_case_info = self._load_test_case_info() self._spj_version = spj_version self._spj_config = spj_config self._output = output if self._spj_version and self._spj_config: self._spj_exe = os.path.join(SPJ_EXE_DIR, self._spj_config["exe_name"].format(spj_version=self._spj_version)) if not os.path.exists(self._spj_exe): raise JudgeClientError("spj exe not found")
def status(cmd, message, args): os_icon, os_color = get_os_icon() general_text = f'Latency: **{int(cmd.bot.latency * 1000)}ms**' general_text += f'\nPlatform: **{sys.platform.upper()}**' general_text += f'\nStarted: **{arrow.get(psutil.boot_time()).humanize()}**' cpu_clock = psutil.cpu_freq() if cpu_clock: cpu_clock = cpu_clock.current else: cpu_clock = 'Unknown' cpu_text = f'Count: **{psutil.cpu_count()} ({psutil.cpu_count(logical=False)})**' cpu_text += f'\nUsage: **{psutil.cpu_percent()}%**' cpu_text += f'\nClock: **{cpu_clock} MHz**' used_mem = humanfriendly.format_size(psutil.virtual_memory().available, binary=True) total_mem = humanfriendly.format_size(psutil.virtual_memory().total, binary=True) mem_text = f'Used: **{used_mem}**' mem_text += f'\nTotal: **{total_mem}**' mem_text += f'\nPercent: **{int(100 - psutil.virtual_memory().percent)}%**' response = discord.Embed(color=os_color) response.set_author(name=socket.gethostname(), icon_url=os_icon) response.add_field(name='General', value=general_text) response.add_field(name='CPU', value=cpu_text) response.add_field(name='Memory', value=mem_text) await message.channel.send(embed=response)
def get_cpu_info(self): res = {} try: # windows 2008 multiprocessing.cpu_count()????? from cpuinfo import cpuinfo info = cpuinfo.get_cpu_info() res['name'] = info['brand'] res['hz'] = info['hz_advertised'] res['brand'] = info['brand'] res['architecture'] = info['arch'] res['physical_cores'] = psutil.cpu_count(logical=False) res['logical_cores'] = psutil.cpu_count() except: res = {} self.logger.error(traceback.format_exc()) return res
def test_cpu_freq(self): def check_ls(ls): for nt in ls: self.assertLessEqual(nt.current, nt.max) for name in nt._fields: value = getattr(nt, name) self.assertIsInstance(value, (int, long, float)) self.assertGreaterEqual(value, 0) ls = psutil.cpu_freq(percpu=True) if TRAVIS and not ls: return assert ls, ls check_ls([psutil.cpu_freq(percpu=False)]) if LINUX: self.assertEqual(len(ls), psutil.cpu_count())
def getLoadAverage(): if linux: import multiprocessing k = 1.0 k /= multiprocessing.cpu_count() if os.path.exists('/proc/loadavg'): return [float(open('/proc/loadavg').read().split()[x]) * k for x in range(3)] else: tokens = subprocess.check_output(['uptime']).split() return [float(x.strip(',')) * k for x in tokens[-3:]] if mswindows: # TODO(Guodong Ding) get this field data like on Linux for Windows # print psutil.cpu_percent() # print psutil.cpu_times_percent() # print psutil.cpu_times() # print psutil.cpu_stats() return "%.2f%%" % psutil.cpu_percent()
def num_cpus(self): # NOTE: use cpu_count if present (16.04 support) if hasattr(psutil, 'cpu_count'): return psutil.cpu_count() else: return psutil.NUM_CPUS
def show_platform(): ''' Show information on platform''' swrite('\n=== SYSTEM ===\n\n') try: linux_distribution = platform.linux_distribution() except: linux_distribution = "N/A" swrite(""" dist: %s linux_distribution: %s system: %s machine: %s platform: %s uname: %s version: %s mac_ver: %s memory: %s number of CPU: %s """ % ( str(platform.dist()), linux_distribution, platform.system(), platform.machine(), platform.platform(), platform.uname(), platform.version(), platform.mac_ver(), psutil.virtual_memory(), str(psutil.cpu_count()) ))
def help_cpus(self): msg = QMessageBox() msg.setIcon(QMessageBox.Question) msg.setText("Setting the number of CPUs") msg.setWindowTitle("Number of CPUs") msg.setInformativeText("SpyKING CIRCUS can use several CPUs " "either locally or on multiple machines " "using MPI (see documentation) " "\n" "\n" "You have %d local CPUs available" %psutil.cpu_count() ) msg.setStandardButtons(QMessageBox.Close) msg.setDefaultButton(QMessageBox.Close) answer = msg.exec_()
def _num_cpus(): ''' Compatibility wrapper for calculating the number of CPU's a unit has. @returns: int: number of CPU cores detected ''' try: return psutil.cpu_count() except AttributeError: return psutil.NUM_CPUS
def benchmark(self): '''Benchmark''' process = psutil.Process() memory = process.memory_info().rss / 2 ** 20 process.cpu_percent() embed = discord.Embed(color = clients.bot_color) embed.add_field(name = "RAM", value = "{:.2f} MiB".format(memory)) embed.add_field(name = "CPU", value = "Calculating CPU usage..") message, embed = await self.bot.say(embed = embed) await asyncio.sleep(1) cpu = process.cpu_percent() / psutil.cpu_count() embed.set_field_at(1, name = "CPU", value = "{}%".format(cpu)) await self.bot.edit_message(message, embed = embed)
def beta_phylogenetic_alt(table: BIOMV210Format, phylogeny: NewickFormat, metric: str, n_jobs: int=1, variance_adjusted: bool=False, alpha: float=None, bypass_tips: bool=False) -> skbio.DistanceMatrix: metrics = phylogenetic_metrics_alt_dict() generalized_unifrac = 'generalized_unifrac' if metric not in metrics: raise ValueError("Unknown metric: %s" % metric) if alpha is not None and metric != generalized_unifrac: raise ValueError('The alpha parameter is only allowed when the choice' ' of metric is generalized_unifrac') # this behaviour is undefined, so let's avoid a seg fault cpus = psutil.cpu_count(logical=False) if n_jobs > cpus: raise ValueError('The value of n_jobs cannot exceed the number of ' 'processors (%d) available in this system.' % cpus) if metric == generalized_unifrac: alpha = 1.0 if alpha is None else alpha f = partial(metrics[metric], alpha=alpha) else: f = metrics[metric] # unifrac processes tables and trees should be filenames return f(str(table), str(phylogeny), threads=n_jobs, variance_adjusted=variance_adjusted, bypass_tips=bypass_tips)
def clean(self): ram = self.cleaned_data.get('ram', None) cores = self.cleaned_data.get('cores', None) if cores: cores = random_sample(range(0, cpu_count()), cores) cores = ','.join(map(str, cores)) self.cleaned_data['cores'] = cores else: cores = cpu_count() cores = str(list(range(0, cores))).strip('[]').replace(" ", "") self.cleaned_data['cores'] = cores if ram: ram = int(ram) self.cleaned_data['ram'] = ram else: ram = int(DHost.memory('total')) self.cleaned_data['ram'] = ram if self.ssh_users: for ssh_user in self.ssh_users: user_obj = User.objects.get(email=ssh_user) if not user_obj.ssh_pub_key: raise forms.ValidationError("SSH key not found") return self.cleaned_data
def cpu(self, type): if type == 'count': return psutil.cpu_count() elif type == 'cpu_percent': return psutil.cpu_percent(interval=1)
def get(self, request, show=False): if show == 'all': show = True cpu_count = get_cpu_count() return ('processes.html', { 'panel_pid': prism.settings.PANEL_PID, 'cpu_count': cpu_count[0], 'cpu_count_logical': cpu_count[1], 'ram': psutil.virtual_memory()[0], 'processes': self.get_processes(show, lambda x: x['memory_percent'] + x['cpu_percent']) })
def get(self, request, process_id): try: p = psutil.Process(process_id) except: return ('system.SystemProcessesView') process = p.as_dict(attrs=['pid', 'name', 'cwd', 'exe', 'username', 'nice', 'cpu_percent', 'cpu_affinity', 'memory_full_info', 'memory_percent', 'status', 'cpu_times', 'threads', 'io_counters', 'open_files', 'create_time', 'cmdline', 'connections']) process['connections'] = p.connections(kind='all') cpu_count = get_cpu_count() return ('process.html', { 'panel_pid': prism.settings.PANEL_PID, 'process_id': process_id, 'cpu_count': cpu_count[0], 'cpu_count_logical': cpu_count[1], 'ram': psutil.virtual_memory()[0], 'proc': process })
def get_cpu_count(): cpu_count = psutil.cpu_count(logical=False) return (cpu_count, psutil.cpu_count(logical=True) - cpu_count)
def _fit_regressor_stump_threaded(X, y, sample_weight, argsorted_X=None): Y = y.flatten() if sample_weight is None: sample_weight = np.ones(shape=(X.shape[0],), dtype='float') / (X.shape[0],) else: sample_weight /= np.sum(sample_weight) classifier_result = [] with cfut.ThreadPoolExecutor(max_workers=psutil.cpu_count()) as tpe: futures = [] if argsorted_X is not None: for dim in six.moves.range(X.shape[1]): futures.append( tpe.submit(_regressor_learn_one_dimension, dim, X[:, dim], Y, sample_weight, argsorted_X[:, dim])) else: for dim in six.moves.range(X.shape[1]): futures.append(tpe.submit(_regressor_learn_one_dimension, dim, X[:, dim], Y, sample_weight)) for future in cfut.as_completed(futures): classifier_result.append(future.result()) # Sort the returned data after lowest error. classifier_result = sorted(classifier_result, key=itemgetter(1)) best_result = classifier_result[0] return { 'best_dim': int(best_result[0]), 'min_value': float(best_result[1]), 'threshold': float(best_result[2]), 'coefficient': float(best_result[3]), 'constant': float(best_result[4]), }
def _fit_regressor_stump_c_ext_threaded(X, y, sample_weight, argsorted_X=None): if c_classifiers is None: return _fit_regressor_stump_threaded(X, y, sample_weight, argsorted_X) Y = y.flatten() if sample_weight is None: sample_weight = np.ones(shape=(X.shape[0],), dtype='float') / (X.shape[0],) else: sample_weight /= np.sum(sample_weight) classifier_result = [] with cfut.ThreadPoolExecutor(max_workers=psutil.cpu_count()) as tpe: futures = [] if argsorted_X is not None: for dim in six.moves.range(X.shape[1]): futures.append( tpe.submit(_regressor_c_learn_one_dimension, dim, X[:, dim], Y, sample_weight, argsorted_X[:, dim])) else: for dim in six.moves.range(X.shape[1]): futures.append(tpe.submit(_regressor_c_learn_one_dimension, dim, X[:, dim], Y, sample_weight)) for future in cfut.as_completed(futures): classifier_result.append(future.result()) # Sort the returned data after lowest error. classifier_result = sorted(classifier_result, key=itemgetter(1)) best_result = classifier_result[0] return { 'best_dim': int(best_result[0]), 'min_value': float(best_result[1]), 'threshold': float(best_result[2]), 'coefficient': float(best_result[3]), 'constant': float(best_result[4]), }
def get_sys_info(): """Display info on system and output as nice HTML""" spacer = "<tr><td> </td><td> </td></tr>" html = '<h3>System Information for {}</h3>'.format(platform.node()) html += '<table>' html += '<tr><td align="left">Python Executable</td>' html += '<td>{}</td></tr>'.format(sys.executable) html += '<tr><td>Kernel PID</td><td>{}</td></tr>'.format( psutil.Process().pid) mem = psutil.virtual_memory() html += '<tr><td>Total System Memory</td>' html += '<td>{:.4} Mb</td></td>'.format(mem.total/1024**3) html += '<tr><td>Total Memory Used</td>' html += '<td>{:.4} Mb</td></td>'.format(mem.used/1024**3) html += '<tr><td>Total Memory Free</td>' html += '<td>{:.4} Mb</td></td>'.format(mem.free/1024**3) html += '<tr><td>Number of CPU Cores</td><td>{}</td></tr>'.format( psutil.cpu_count()) html += '<tr><td>Current CPU Load</td><td>{} %</td></tr>'.format( psutil.cpu_percent(1, False)) html += '</table>' return HTML(html)
def count(self): return psutil.cpu_count()