Python os 模块,cpu_count() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.cpu_count()。
def cpu_count_physical():
"""Return the number of physical cores in the system."""
mapping = {}
current_info = {}
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
line = line.strip().lower()
if not line:
# new section
if (b'physical id' in current_info and
b'cpu cores' in current_info):
mapping[current_info[b'physical id']] = \
current_info[b'cpu cores']
current_info = {}
else:
# ongoing section
if (line.startswith(b'physical id') or
line.startswith(b'cpu cores')):
key, value = line.split(b'\t:', 1)
current_info[key] = int(value)
# mimic os.cpu_count()
return sum(mapping.values()) or None
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def cpu_count_physical():
"""Return the number of physical cores in the system."""
mapping = {}
current_info = {}
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
line = line.strip().lower()
if not line:
# new section
if (b'physical id' in current_info and
b'cpu cores' in current_info):
mapping[current_info[b'physical id']] = \
current_info[b'cpu cores']
current_info = {}
else:
# ongoing section
if (line.startswith(b'physical id') or
line.startswith(b'cpu cores')):
key, value = line.split(b'\t:', 1)
current_info[key] = int(value)
# mimic os.cpu_count()
return sum(mapping.values()) or None
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def startEvents(self):
""" Process events
"""
if self.state != QueueStatus.un_init:
raise RuntimeError("At present the queue can only be started once!")
self._thread_parent = Thread(target = self._worker, args = ( self.queue_parent,))
self._thread_parent.start()
self._child_worker_count = 1 if cpu_count() <= 1 else cpu_count() - 1
self._thread_children = [
Thread(target = self._worker, args = ( self.queue_child,)) \
for i in range(self._child_worker_count) ]
[ x.start() for x in self._thread_children ]
self._thread_feedback = Thread(target = self._worker, args = ( self.queue_feedback,))
self._thread_feedback.start()
self._thread_final = Thread(target = self._worker, args = ( self.queue_final,))
self._thread_final.start()
self._put_parent()
self.on_started()
def wait_cgroup(sock, execute_task, time_limit_ns, memory_limit_bytes, process_limit):
cgroup = CGroup()
try:
cgroup.memory_limit_bytes = memory_limit_bytes
cgroup.pids_max = process_limit
await cgroup.accept(sock)
start_idle = _get_idle()
while True:
cpu_usage_ns = cgroup.cpu_usage_ns
idle_usage_ns = int((_get_idle() - start_idle) / cpu_count() * 1e9)
time_usage_ns = max(cpu_usage_ns, idle_usage_ns)
time_remain_ns = time_limit_ns - time_usage_ns
if time_remain_ns <= 0:
return time_usage_ns, cgroup.memory_usage_bytes
try:
await wait_for(shield(execute_task), (time_remain_ns + WAIT_JITTER_NS) / 1e9)
return cgroup.cpu_usage_ns, cgroup.memory_usage_bytes
except TimeoutError:
pass
finally:
while cgroup.kill():
await sleep(.001)
cgroup.close()
def cpu_count_physical():
"""Return the number of physical cores in the system."""
mapping = {}
current_info = {}
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
line = line.strip().lower()
if not line:
# new section
if (b'physical id' in current_info and
b'cpu cores' in current_info):
mapping[current_info[b'physical id']] = \
current_info[b'cpu cores']
current_info = {}
else:
# ongoing section
if (line.startswith(b'physical id') or
line.startswith(b'cpu cores')):
key, value = line.split(b'\t:', 1)
current_info[key] = int(value)
# mimic os.cpu_count()
return sum(mapping.values()) or None
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def cpu_count_physical():
"""Return the number of physical cores in the system."""
mapping = {}
current_info = {}
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
line = line.strip().lower()
if not line:
# new section
if (b'physical id' in current_info and
b'cpu cores' in current_info):
mapping[current_info[b'physical id']] = \
current_info[b'cpu cores']
current_info = {}
else:
# ongoing section
if (line.startswith(b'physical id') or
line.startswith(b'cpu cores')):
key, value = line.split(b'\t:', 1)
current_info[key] = int(value)
# mimic os.cpu_count()
return sum(mapping.values()) or None
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def info():
uname = os.uname()
cpu = lscpu()
mem = meminfo()
return {
'arch': uname.machine,
'byte_order': sys.byteorder,
'cpu_cache_L1d': parse_size(cpu.get('L1d cache')),
'cpu_cache_L1i': parse_size(cpu.get('L1i cache')),
'cpu_cache_L2': parse_size(cpu.get('L2 cache')),
'cpu_cache_L3': parse_size(cpu.get('L3 cache')),
'cpu_count': os.cpu_count(),
'cpu_mhz': parse_float(cpu.get('CPU MHz')),
'cpu_name': cpu.get('Model name'),
'kernel': uname.sysname,
'kernel_release': uname.release,
'kernel_version': uname.version,
'memory': parse_size(mem.get('MemTotal'))
}
def oparallel(std_data, final_meas, groups=None, cfg=None):
"""Calculate the hospital group scores for each LVM."""
if cfg is not None:
groups = cfg.GROUPS
cpus = os.cpu_count()
# nproc = 1 if cpus is None else cpus - 1 or 1 # leave one CPU unused
nproc = 1 if cpus is None else cpus # use all CPUs
pool = multiprocessing.Pool(nproc)
group_data = zip(
[std_data for _ in groups],
[final_meas[g] for g in groups],
groups,
[cfg for _ in groups]
)
r = pool.map(worker, group_data)
return zip(*r)
def idle_cpu_count(mincpu=1):
"""Estimate number of idle CPUs, for use by multiprocessing code
needing to determine how many processes can be run without excessive
load. This function uses :func:`os.getloadavg` which is only available
under a Unix OS.
Parameters
----------
mincpu : int
Minimum number of CPUs to report, independent of actual estimate
Returns
-------
idle : int
Estimate of number of idle CPUs
"""
if PY2:
ncpu = mp.cpu_count()
else:
ncpu = os.cpu_count()
idle = int(ncpu - np.floor(os.getloadavg()[0]))
return max(mincpu, idle)
def cpu_count_physical():
"""Return the number of physical cores in the system."""
mapping = {}
current_info = {}
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
line = line.strip().lower()
if not line:
# new section
if (b'physical id' in current_info and
b'cpu cores' in current_info):
mapping[current_info[b'physical id']] = \
current_info[b'cpu cores']
current_info = {}
else:
# ongoing section
if (line.startswith(b'physical id') or
line.startswith(b'cpu cores')):
key, value = line.split(b'\t:', 1)
current_info[key] = int(value)
# mimic os.cpu_count()
return sum(mapping.values()) or None
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def get_torch_num_workers(num_workers: int):
"""turn an int into a useful number of workers for a pytorch DataLoader.
-1 means "use all CPU's", -2, means "use all but 1 CPU", etc.
Note: 0 is interpreted by pytorch as doing data loading in the main process, while any positive number spawns a
new process. We do not allow more processes to spawn than there are CPU's."""
num_cpu = cpu_count()
if num_workers < 0:
n_workers = num_cpu + 1 + num_workers
if n_workers < 0:
print("Warning: {} fewer workers than the number of CPU's were specified, but there are only {} CPU's; "
"running data loading in the main process (num_workers = 0).".format(num_workers + 1, num_cpu))
num_workers = max(0, n_workers)
if num_workers > num_cpu:
print("Warning, `num_workers` is {} but only {} CPU's are available; "
"using this number instead".format(num_workers, num_cpu))
return min(num_workers, num_cpu)
#####################################################################
# Iterator utils #
#####################################################################
def __init__(self, j=os.cpu_count(), max_utilization=100):
"""
Initialize a parallel loop object.
:param j: The maximum number of parallel jobs.
:param max_utilization: The maximum CPU utilization. Above this no more new jobs
will be started.
"""
self._j = j
self._max_utilization = max_utilization
# This gets initialized to 0, may be set to 1 anytime, but must not be reset to 0 ever;
# thus, no locking is needed when accessing
self._break = multiprocessing.sharedctypes.Value('i', 0, lock=False)
self._lock = multiprocessing.Condition()
self._slots = multiprocessing.sharedctypes.Array('i', j, lock=False)
psutil.cpu_percent(None)
# Beware! this is running in a new process now. state is shared with fork,
# but only changes to shared objects will be visible in parent.
def __init__(self, test, *, cache=None, split=config_splitters.zeller,
proc_num=os.cpu_count(), max_utilization=100,
config_iterator=config_iterators.forward):
"""
Initialize a CombinedParallelDD object.
:param test: A callable tester object.
:param cache: Cache object to use.
:param split: Splitter method to break a configuration up to n part.
:param proc_num: The level of parallelization.
:param max_utilization: The maximum CPU utilization accepted.
:param config_iterator: Reference to a generator function that provides config indices in an arbitrary order.
"""
AbstractParallelDD.__init__(self, test, split, proc_num, max_utilization, cache=cache)
self._config_iterator = config_iterator
def __init__(self, test, *, cache=None, split=config_splitters.zeller,
proc_num=os.cpu_count(), max_utilization=100,
subset_first=True, subset_iterator=config_iterators.forward, complement_iterator=config_iterators.forward):
"""
Initialize a ParallelDD object.
:param test: A callable tester object.
:param cache: Cache object to use.
:param split: Splitter method to break a configuration up to n part.
:param proc_num: The level of parallelization.
:param max_utilization: The maximum CPU utilization accepted.
:param subset_first: Boolean value denoting whether the reduce has to start with the subset based approach or not.
:param subset_iterator: Reference to a generator function that provides config indices in an arbitrary order.
:param complement_iterator: Reference to a generator function that provides config indices in an arbitrary order.
"""
AbstractParallelDD.__init__(self, test, split, proc_num, max_utilization, cache=cache)
self._subset_first = subset_first
self._subset_iterator = subset_iterator
self._complement_iterator = complement_iterator
def get_logical_cpu_count():
if psutil is not None:
# Number of logical CPUs
cpu_count = psutil.cpu_count()
elif hasattr(os, 'cpu_count'):
# Python 3.4
cpu_count = os.cpu_count()
else:
cpu_count = None
try:
import multiprocessing
except ImportError:
pass
else:
try:
cpu_count = multiprocessing.cpu_count()
except NotImplementedError:
pass
if cpu_count is not None and cpu_count < 1:
return None
return cpu_count
def cpu_count_physical():
"""Return the number of physical cores in the system."""
mapping = {}
current_info = {}
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
line = line.strip().lower()
if not line:
# new section
if (b'physical id' in current_info and
b'cpu cores' in current_info):
mapping[current_info[b'physical id']] = \
current_info[b'cpu cores']
current_info = {}
else:
# ongoing section
if (line.startswith(b'physical id') or
line.startswith(b'cpu cores')):
key, value = line.split(b'\t:', 1)
current_info[key] = int(value)
# mimic os.cpu_count()
return sum(mapping.values()) or None
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def cpu_count_physical():
"""Return the number of physical cores in the system."""
mapping = {}
current_info = {}
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
line = line.strip().lower()
if not line:
# new section
if (b'physical id' in current_info and
b'cpu cores' in current_info):
mapping[current_info[b'physical id']] = \
current_info[b'cpu cores']
current_info = {}
else:
# ongoing section
if (line.startswith(b'physical id') or
line.startswith(b'cpu cores')):
key, value = line.split(b'\t:', 1)
current_info[key] = int(value)
# mimic os.cpu_count()
return sum(mapping.values()) or None
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def cpu_count_physical():
"""Return the number of physical cores in the system."""
mapping = {}
current_info = {}
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
line = line.strip().lower()
if not line:
# new section
if (b'physical id' in current_info and
b'cpu cores' in current_info):
mapping[current_info[b'physical id']] = \
current_info[b'cpu cores']
current_info = {}
else:
# ongoing section
if (line.startswith(b'physical id') or
line.startswith(b'cpu cores')):
key, value = line.split(b'\t:', 1)
current_info[key] = int(value)
# mimic os.cpu_count()
return sum(mapping.values()) or None
def cpu_count(logical=True):
"""Return the number of logical CPUs in the system (same as
os.cpu_count() in Python 3.4).
If logical is False return the number of physical cores only
(e.g. hyper thread CPUs are excluded).
Return None if undetermined.
The return value is cached after first call.
If desired cache can be cleared like this:
>>> psutil.cpu_count.cache_clear()
"""
if logical:
return _psplatform.cpu_count_logical()
else:
return _psplatform.cpu_count_physical()
def workers(master_host, master_port, relay_socket_path, num_workers):
# Start the relay
master_redis_cfg = {'host': master_host, 'port': master_port}
relay_redis_cfg = {'unix_socket_path': relay_socket_path}
if os.fork() == 0:
RelayClient(master_redis_cfg, relay_redis_cfg).run()
return
# Start the workers
noise = SharedNoiseTable() # Workers share the same noise
num_workers = num_workers if num_workers else os.cpu_count()
logging.info('Spawning {} workers'.format(num_workers))
for _ in range(num_workers):
if os.fork() == 0:
run_worker(relay_redis_cfg, noise=noise)
return
os.wait()
def _get_thread_pool(self):
# lazily initialized
if not self._thread_pool:
self._thread_pool = ThreadPool(os.cpu_count())
return self._thread_pool
def main():
print("Main Process PID: {}".format(multiprocessing.current_process().pid))
myProcess = MyProcess()
myProcess.start()
myProcess.join()
processes = []
for i in range(os.cpu_count()):
process = MyProcess()
processes.append(process)
process.start()
for process in processes:
process.join()
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# as a second fallback we try to parse /proc/cpuinfo
num = 0
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
if line.lower().startswith(b'processor'):
num += 1
# unknown format (e.g. amrel/sparc architectures), see:
# https://github.com/giampaolo/psutil/issues/200
# try to parse /proc/stat as a last resort
if num == 0:
search = re.compile('cpu\d')
with open_text('%s/stat' % get_procfs_path()) as f:
for line in f:
line = line.split(' ')[0]
if search.match(line):
num += 1
if num == 0:
# mimic os.cpu_count()
return None
return num
def cpu_count_physical():
"""Return the number of physical CPUs in the system."""
# From the C module we'll get an XML string similar to this:
# http://manpages.ubuntu.com/manpages/precise/man4/smp.4freebsd.html
# We may get None in case "sysctl kern.sched.topology_spec"
# is not supported on this BSD version, in which case we'll mimic
# os.cpu_count() and return None.
ret = None
s = cext.cpu_count_phys()
if s is not None:
# get rid of padding chars appended at the end of the string
index = s.rfind("</groups>")
if index != -1:
s = s[:index + 9]
root = ET.fromstring(s)
try:
ret = len(root.findall('group/children/group/cpu')) or None
finally:
# needed otherwise it will memleak
root.clear()
if not ret:
# If logical CPUs are 1 it's obvious we'll have only 1
# physical CPU.
if cpu_count_logical() == 1:
return 1
return ret
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# mimic os.cpu_count() behavior
return None
def cpu_count_physical():
"""Return the number of physical CPUs in the system."""
# From the C module we'll get an XML string similar to this:
# http://manpages.ubuntu.com/manpages/precise/man4/smp.4freebsd.html
# We may get None in case "sysctl kern.sched.topology_spec"
# is not supported on this BSD version, in which case we'll mimic
# os.cpu_count() and return None.
ret = None
s = cext.cpu_count_phys()
if s is not None:
# get rid of padding chars appended at the end of the string
index = s.rfind("</groups>")
if index != -1:
s = s[:index + 9]
root = ET.fromstring(s)
try:
ret = len(root.findall('group/children/group/cpu')) or None
finally:
# needed otherwise it will memleak
root.clear()
if not ret:
# If logical CPUs are 1 it's obvious we'll have only 1
# physical CPU.
if cpu_count_logical() == 1:
return 1
return ret
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# mimic os.cpu_count() behavior
return None
def download_songs(song_ids: typing.Iterator[str]):
songs = filter(None, map(_lookup_gmusic_song, song_ids))
def _load(song):
print("Loading song", song)
song.load()
with ThreadPoolExecutor(os.cpu_count() * 2) as thread_pool:
loading = thread_pool.map(_load, songs)
for _ in loading:
pass
def defaultNumberOfMakeJobs():
makeJobs = os.cpu_count()
if makeJobs > 24:
# don't use up all the resources on shared build systems
# (you can still override this with the -j command line option)
makeJobs = 16
return makeJobs
def task_add_missing_blocks(missing_block_nums,
max_procs,
max_threads,
database_url,
steemd_http_url,
task_num=5):
task_message = fmt_task_message(
'Adding missing blocks to db, this may take a while',
emoji_code_point=u'\U0001F4DD',
task_num=task_num)
click.echo(task_message)
max_workers = max_procs or os.cpu_count() or 1
chunksize = len(missing_block_nums) // max_workers
if chunksize <= 0:
chunksize = 1
#counter = Value('L',0)
map_func = partial(
block_adder_process_worker,
database_url,
steemd_http_url,
max_threads=max_threads)
chunks = chunkify(missing_block_nums, 10000)
with Pool(processes=max_workers) as pool:
results = pool.map(map_func, chunks)
success_msg = fmt_success_message('added missing blocks')
click.echo(success_msg)
def __init__(self, max_workers=0, thread_name_prefix='', max_queue_size=0):
#super().__init__(max_workers or os.cpu_count() or 1, thread_name_prefix)
super().__init__(max_workers or os.cpu_count() or 1)
self.max_queue_size = max_queue_size or self._max_workers * 10
if self.max_queue_size > 0:
self._work_queue = queue.Queue(self.max_queue_size)
self.max_queue_size_reached = 0
def __init__(self, map_func, reduce_func, workers=None):
self.map_func = map_func
self.reduce_func = reduce_func
self._workers = os.cpu_count() or 1 if workers is None else workers
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# as a second fallback we try to parse /proc/cpuinfo
num = 0
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
if line.lower().startswith(b'processor'):
num += 1
# unknown format (e.g. amrel/sparc architectures), see:
# https://github.com/giampaolo/psutil/issues/200
# try to parse /proc/stat as a last resort
if num == 0:
search = re.compile('cpu\d')
with open_text('%s/stat' % get_procfs_path()) as f:
for line in f:
line = line.split(' ')[0]
if search.match(line):
num += 1
if num == 0:
# mimic os.cpu_count()
return None
return num
def cpu_count_physical():
"""Return the number of physical CPUs in the system."""
# From the C module we'll get an XML string similar to this:
# http://manpages.ubuntu.com/manpages/precise/man4/smp.4freebsd.html
# We may get None in case "sysctl kern.sched.topology_spec"
# is not supported on this BSD version, in which case we'll mimic
# os.cpu_count() and return None.
ret = None
s = cext.cpu_count_phys()
if s is not None:
# get rid of padding chars appended at the end of the string
index = s.rfind("</groups>")
if index != -1:
s = s[:index + 9]
root = ET.fromstring(s)
try:
ret = len(root.findall('group/children/group/cpu')) or None
finally:
# needed otherwise it will memleak
root.clear()
if not ret:
# If logical CPUs are 1 it's obvious we'll have only 1
# physical CPU.
if cpu_count_logical() == 1:
return 1
return ret
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# mimic os.cpu_count() behavior
return None
def cpu_count_physical():
"""Return the number of physical CPUs in the system."""
# From the C module we'll get an XML string similar to this:
# http://manpages.ubuntu.com/manpages/precise/man4/smp.4freebsd.html
# We may get None in case "sysctl kern.sched.topology_spec"
# is not supported on this BSD version, in which case we'll mimic
# os.cpu_count() and return None.
ret = None
s = cext.cpu_count_phys()
if s is not None:
# get rid of padding chars appended at the end of the string
index = s.rfind("</groups>")
if index != -1:
s = s[:index + 9]
root = ET.fromstring(s)
try:
ret = len(root.findall('group/children/group/cpu')) or None
finally:
# needed otherwise it will memleak
root.clear()
if not ret:
# If logical CPUs are 1 it's obvious we'll have only 1
# physical CPU.
if cpu_count_logical() == 1:
return 1
return ret
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# mimic os.cpu_count() behavior
return None
def test_system(json_request, accept):
result = await json_request('/system', {}, accept=accept)
assert result['success']
assert result['system']['cpu_count'] == os.cpu_count()
assert isinstance(result['system']['cpu_mhz'], float)
assert isinstance(result['system']['memory'], int)
def _workers_count():
cpu_count = 0
try:
cpu_count = len(os.sched_getaffinity(0))
except AttributeError:
cpu_count = os.cpu_count()
return cpu_count * 4
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# as a second fallback we try to parse /proc/cpuinfo
num = 0
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
if line.lower().startswith(b'processor'):
num += 1
# unknown format (e.g. amrel/sparc architectures), see:
# https://github.com/giampaolo/psutil/issues/200
# try to parse /proc/stat as a last resort
if num == 0:
search = re.compile('cpu\d')
with open_text('%s/stat' % get_procfs_path()) as f:
for line in f:
line = line.split(' ')[0]
if search.match(line):
num += 1
if num == 0:
# mimic os.cpu_count()
return None
return num
def test_cpu_count_logical_w_nproc(self):
num = int(sh("nproc --all"))
self.assertEqual(psutil.cpu_count(logical=True), num)