Python os 模块,getloadavg() 实例源码
我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用os.getloadavg()。
def idle_cpu_count(mincpu=1):
"""Estimate number of idle CPUs, for use by multiprocessing code
needing to determine how many processes can be run without excessive
load. This function uses :func:`os.getloadavg` which is only available
under a Unix OS.
Parameters
----------
mincpu : int
Minimum number of CPUs to report, independent of actual estimate
Returns
-------
idle : int
Estimate of number of idle CPUs
"""
if PY2:
ncpu = mp.cpu_count()
else:
ncpu = os.cpu_count()
idle = int(ncpu - np.floor(os.getloadavg()[0]))
return max(mincpu, idle)
def display_loadavg(scr):
lavg = os.getloadavg()
write(scr, 1, 0, 'System', curses.color_pair(9))
write(scr, 1, 7, 'Load:', curses.color_pair(9))
write(scr, 1, 13, '%.02f' % lavg[0], curses.color_pair(9))
write(scr, 1, 20, '%.02f' % lavg[1], curses.color_pair(9))
write(scr, 1, 27, '%.02f' % lavg[2], curses.color_pair(9))
def load(self):
if not self._load:
self._load = os.getloadavg()
return self._load
def get_load(self):
"""
??????????
"""
try:
data = os.getloadavg()[0]
except Exception as err:
print err
data = str(err)
return data
# ----------------end: ???????????????-----------------
# ----------------start: ??CPU????-----------------
def update(self, widgets):
self._load = os.getloadavg()
def __init__(self, interval=15, monitor_interval=60*60,
create_time=time.time, get_load_average=os.getloadavg):
self._interval = interval
self._monitor_interval = monitor_interval
self._create_time = create_time
self._load_averages = []
self._get_load_average = get_load_average
def run(self):
self._sysinfo.add_header("System load", str(os.getloadavg()[0]))
return succeed(None)
def state_report_engine(self):
"""
??????????
"""
self.init_conn()
# ????????????
self.update_interfaces()
self.update_disks()
boot_time = ji.Common.ts()
while True:
if Utils.exit_flag:
msg = 'Thread state_report_engine say bye-bye'
print msg
logger.info(msg=msg)
return
thread_status['state_report_engine'] = ji.JITime.now_date_time()
# noinspection PyBroadException
try:
time.sleep(2)
# ????????
if ji.Common.ts() % 60 == 0:
self.update_interfaces()
self.update_disks()
host_event_emit.heartbeat(message={'node_id': self.node_id, 'cpu': self.cpu, 'memory': self.memory,
'interfaces': self.interfaces, 'disks': self.disks,
'system_load': os.getloadavg(), 'boot_time': boot_time,
'memory_available': psutil.virtual_memory().available})
except:
logger.error(traceback.format_exc())
log_emit.error(traceback.format_exc())
def heart_beat(self):
while(1):
my_load=os.getloadavg()
my_load=my_load[0]
command="load#"+str(my_load)
self.socket.sendto(command, (self.server_ip, self.tx_port))
sleep(0.25)
def cpu_usage():
# load average, uptime
uptime = datetime.now() - datetime.fromtimestamp(psutil.boot_time())
av1, av2, av3 = os.getloadavg()
return "Ld:%.1f %.1f %.1f Up: %s" \
% (av1, av2, av3, str(uptime).split('.')[0])
def get_os_internals(): # noqa: D103
os_internals = []
if hasattr(os, 'getcwd'):
os_internals.append(("Current Working Directory", os.getcwd()))
if hasattr(os, 'getegid'):
os_internals.append(("Effective Group ID", os.getegid()))
if hasattr(os, 'geteuid'):
os_internals.append(("Effective User ID", os.geteuid()))
if hasattr(os, 'getgid'):
os_internals.append(("Group ID", os.getgid()))
if hasattr(os, 'getuid'):
os_internals.append(("User ID", os.getuid()))
if hasattr(os, 'getgroups'):
os_internals.append(("Group Membership", ', '.join(map(str, os.getgroups()))))
if hasattr(os, 'linesep'):
os_internals.append(("Line Seperator", repr(os.linesep)[1:-1]))
if hasattr(os, 'pathsep'):
os_internals.append(("Path Seperator", os.pathsep))
if hasattr(os, 'getloadavg'):
os_internals.append(("Load Avarage", ', '.join(
map(lambda x: str(round(x, 2)), os.getloadavg()))))
return os_internals
def main():
print(os.nice(0)) # get relative process priority
print(os.nice(1)) # change relative priority
print(os.times()) # process times: system, user etc...
print(os.isatty(0)) # is the file descriptor arg a tty?(0 = stdin)
print(os.isatty(4)) # 4 is just an arbitrary test value
print(os.getloadavg()) # UNIX only - number of processes in queue
print(os.cpu_count()) # New in Python 3.4
def collect_system_metadata(metadata):
metadata['platform'] = platform.platform(True, False)
if sys.platform.startswith('linux'):
collect_linux_metadata(metadata)
# on linux, load average over 1 minute
for line in read_proc("loadavg"):
fields = line.split()
loadavg = fields[0]
metadata['load_avg_1min'] = float(loadavg)
if len(fields) >= 4 and '/' in fields[3]:
runnable_threads = fields[3].split('/', 1)[0]
runnable_threads = int(runnable_threads)
metadata['runnable_threads'] = runnable_threads
if 'load_avg_1min' not in metadata and hasattr(os, 'getloadavg'):
metadata['load_avg_1min'] = os.getloadavg()[0]
# Hostname
hostname = socket.gethostname()
if hostname:
metadata['hostname'] = hostname
# Boot time
boot_time = None
for line in read_proc("stat"):
if not line.startswith("btime "):
continue
boot_time = int(line[6:])
break
if boot_time is None and psutil:
boot_time = psutil.boot_time()
if boot_time is not None:
btime = datetime.datetime.fromtimestamp(boot_time)
metadata['boot_time'] = format_datetime(btime)
metadata['uptime'] = time.time() - boot_time
def _get_system_stats_(self):
stats = {}
one, five, fifteen = os.getloadavg()
stats['load'] = [one, five, fifteen]
return stats
def check(self):
data = {}
# 20160725 windows???load??
if platform.system() != 'Windows':
load = os.getloadavg()
data.update({'load.1': load[0], 'load.5': load[1], 'load.15': load[2]})
data.update({"cpu.used_total": int(psutil.cpu_percent())})
# ????CPU????
per_cpu = psutil.cpu_percent(percpu=True)
# ????CPU0???
data.update({'cpu.cpu{0}_used'.format(i): int(val) for i,val in enumerate(per_cpu)})
# ??CPU???
new_cpu_times = psutil.cpu_times()
if self.last_cpu_times is not None:
last_total_time = reduce(lambda s,x:s+x, self.last_cpu_times)
now_total_time = reduce(lambda s,x:s+x, new_cpu_times)
total_time = now_total_time - last_total_time
data['cpu.used_sy'] = self._get_cpu_time('system', total_time, new_cpu_times)
data['cpu.used_us'] = self._get_cpu_time('user', total_time, new_cpu_times)
data['cpu.used_wa'] = self._get_cpu_time('iowait', total_time, new_cpu_times)
# data['cpu.used_id'] = self._get_cpu_time('idle', total_time, new_cpu_times)
# data['cpu.used_ni'] = self._get_cpu_time('nice', total_time, new_cpu_times)
# data['cpu.used_hi'] = self._get_cpu_time('irq', total_time, new_cpu_times)
# data['cpu.used_si'] = self._get_cpu_time('softirq', total_time, new_cpu_times)
# data['cpu.used_st'] = self._get_cpu_time('steal', total_time, new_cpu_times)
else:# ?????
self.last_cpu_times = new_cpu_times
gevent.sleep(0.1)
new_cpu_times = psutil.cpu_times()
last_total_time = reduce(lambda s,x:s+x, self.last_cpu_times)
now_total_time = reduce(lambda s,x:s+x, new_cpu_times)
total_time = now_total_time - last_total_time
data['cpu.used_sy'] = self._get_cpu_time('system', total_time, new_cpu_times)
data['cpu.used_us'] = self._get_cpu_time('user', total_time, new_cpu_times)
data['cpu.used_wa'] = self._get_cpu_time('iowait', total_time, new_cpu_times)
self.last_cpu_times = new_cpu_times
return data
def getSystemInfo():
"""
Get a dictionary with some system/server info
:return: ["unix", "connectedUsers", "webServer", "cpuUsage", "totalMemory", "usedMemory", "loadAverage"]
"""
data = {"unix": runningUnderUnix(), "connectedUsers": len(glob.tokens.tokens), "matches": len(glob.matches.matches)}
# General stats
delta = time.time()-glob.startTime
days = math.floor(delta/86400)
delta -= days*86400
hours = math.floor(delta/3600)
delta -= hours*3600
minutes = math.floor(delta/60)
delta -= minutes*60
seconds = math.floor(delta)
data["uptime"] = "{}d {}h {}m {}s".format(days, hours, minutes, seconds)
data["cpuUsage"] = psutil.cpu_percent()
memory = psutil.virtual_memory()
data["totalMemory"] = "{0:.2f}".format(memory.total/1074000000)
data["usedMemory"] = "{0:.2f}".format(memory.active/1074000000)
# Unix only stats
if data["unix"]:
data["loadAverage"] = os.getloadavg()
else:
data["loadAverage"] = (0,0,0)
return data
def getloadavg():
r = os.getloadavg()
return '{} {} {}'.format(r[0],r[1],r[2])
def heartbeat(self, pool=None, node=None, ttl=None, **doc):
if None in [pool, node, ttl]:
raise Exception('"pool", "node" and "ttl" are required arguments.')
doc['id'] = "%s:%s:%s" % (pool, node, doc.get('segment'))
logging.info("Setting Heartbeat ID to [%s]" % doc['id'])
doc['role'] = pool
doc['node'] = node
doc['ttl'] = ttl
doc['load'] = os.getloadavg()[1] # load average over last 5 mins
logging.info('Heartbeat: role[%s] node[%s] at IP %s:%s with ttl %s' % (pool, node, node, doc.get('port'), ttl))
return self.services.heartbeat(doc)
def bulk_heartbeat(self, ids):
self.rethinker.table('services').get_all(*ids).update({ 'last_heartbeat': r.now(), 'load': os.getloadavg()[1] }).run()
# send a non-bulk heartbeat for each id we *didn't* just update
missing_ids = set(ids) - set(self.rethinker.table('services').get_all(*ids).get_field('id').run())
for id in missing_ids:
pool, node, segment = id.split(":")
port = settings['WRITE_PORT'] if pool == 'trough-write' else settings['READ_PORT']
url = 'http://%s:%s/?segment=%s' % (node, port, segment)
self.heartbeat(pool=pool, node=node, segment=segment, port=port, url=url, ttl=round(settings['SYNC_LOOP_TIMING'] * 4))
def print_status():
ret = ""
if os.path.isfile(".scff/distributed"):
mode = "Distributed"
else:
mode = str(CPU_CORES) + " * " + fuzzer
ret = ("\nMode: " + mode + " \tUptime: " + get_uptime() + " \tLoad: " \
+ str(os.getloadavg()[0])[:4] + "\tCPU:" \
+ str(int(psutil.cpu_percent(interval=0.2))) \
+ "%")
if len(getRunningFuzzers()) >= 1:
ret += ("\nS CMDLINE PID CPU% MEM%")
for proc in getRunningFuzzers():
if proc.status() == "sleeping":
status = "zZ"
status = "S"
elif proc.status() == "running":
status = ">>"
status = "R"
elif proc.status() == "stopped":
status = "||"
status = "T"
else:
status = ":("
status = "D"
cmdline = list2str(proc.cmdline())
ret += ( \
"\n{} {:.42} {} {} {}".format( \
status, \
cmdline, \
" " * (45 - min(len(cmdline), 42)) + str(proc.pid), \
proc.cpu_percent(interval=0.1), \
str(round(proc.memory_percent(), 2))) \
)
else:
ret += ("\n\t\t*** No running fuzzers found! ***")
return ret
def f_print_linux_status(save_as):
###????###################################################################
#scputimes(user=, nice, system, idle, iowait, irq, softirq,steal, guest, guest_nice)
cpu_times = psutil.cpu_times()
#scpustats(ctx_switches, interrupts, soft_interrupts, syscalls)
#cpu_stats = psutil.cpu_stats()
# svmem(total , available, percent, used , free, active, inactive, buffers, cached, shared)
mem = psutil.virtual_memory()
# sswap(total, used, free, percent, sin, sout)
swap = psutil.swap_memory()
#sdiskusage(total, used, free, percent)
#disk_usage = psutil.disk_usage('/')
#sdiskio(read_count, write_count, read_bytes, write_bytes, read_time, write_time)
#disk_io_counters = psutil.disk_io_counters()
#snetio(bytes_sent, bytes_recv, packets_sent, packets_recv, errin, errout, dropin, dropout)
#net = psutil.net_io_counters()
#load
try:
load = os.getloadavg()
except (OSError, AttributeError):
stats = {}
else:
stats = {'min1': load[0], 'min5': load[1], 'min15': load[2]}
#Uptime = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
###????###################################################################
style1 = {1: ' ,6,l', 2: ' ,10,r',3: ' ,6,l', 4: ' ,10,r',5: ' ,6,l', 6: ' ,6,r',7: ' ,8,l',8: ' ,6,r',9: ' ,6,l', 10: ' ,6,r',11: ' ,6,l', 12: ' ,5,r',}
style = {1: ' ,l', 2: ' ,r',3: ' ,l', 4: ' ,r',5: ' ,l', 6: ' ,r',7: ' ,l',8: ' ,r',9: ' ,l', 10: ' ,r',11: ' ,l', 12: ' ,r',}
rows=[
["CPU", str(psutil.cpu_percent(interval=1))+'%',"nice", cpu_times.nice,"MEM", str(mem.percent) + '%',"active", str(mem.active/1024/1024) + 'M',"SWAP", str(swap.percent)+'%',"LOAD", str(psutil.cpu_count())+'core'],
["user", cpu_times.user,"irq", cpu_times.irq,"total", str(mem.total/1024/1024)+'M',"inactive", str(mem.inactive/1024/1024) + 'M',"total", str(swap.total/1024/1024) + 'M',"1 min", stats["min1"]],
["system", cpu_times.system,"iowait", cpu_times.iowait,"used", str(mem.used/1024/1024)+'M',"buffers", str(mem.buffers/1024/1024) + 'M',"used", str(swap.used/1024/1024) + 'M',"5 min", stats["min5"]],
["idle", cpu_times.idle,"steal", cpu_times.steal,"free", str(mem.free/1024/1024) + 'M',"cached", str(mem.cached/1024/1024) + 'M',"free", str(swap.free/1024/1024) + 'M',"15 min", stats["min15"]]
]
title = "Linux Overview"
if save_as == "txt":
f_print_title(title)
f_print_table_body(rows, style1,' ')
elif save_as == "html":
f_print_table_html(rows, title, style)
def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2,
track_cpu_count=False, short=False):
'''Return system load average.
Highlights using ``system_load_good``, ``system_load_bad`` and
``system_load_ugly`` highlighting groups, depending on the thresholds
passed to the function.
:param str format:
format string, receives ``avg`` as an argument
:param float threshold_good:
threshold for gradient level 0: any normalized load average below this
value will have this gradient level.
:param float threshold_bad:
threshold for gradient level 100: any normalized load average above this
value will have this gradient level. Load averages between
``threshold_good`` and ``threshold_bad`` receive gradient level that
indicates relative position in this interval:
(``100 * (cur-good) / (bad-good)``).
Note: both parameters are checked against normalized load averages.
:param bool track_cpu_count:
if True powerline will continuously poll the system to detect changes
in the number of CPUs.
:param bool short:
if True only the sys load over last 1 minute will be displayed.
Divider highlight group used: ``background:divider``.
Highlight groups used: ``system_load_gradient`` (gradient) or ``system_load``.
'''
global cpu_count
try:
cpu_num = cpu_count = _cpu_count() if cpu_count is None or track_cpu_count else cpu_count
except NotImplementedError:
pl.warn('Unable to get CPU count: method is not implemented')
return None
ret = []
for avg in os.getloadavg():
normalized = avg / cpu_num
if normalized < threshold_good:
gradient_level = 0
elif normalized < threshold_bad:
gradient_level = (normalized - threshold_good) * 100.0 / (threshold_bad - threshold_good)
else:
gradient_level = 100
ret.append({
'contents': format.format(avg=avg),
'highlight_groups': ['system_load_gradient', 'system_load'],
'divider_highlight_group': 'background:divider',
'gradient_level': gradient_level,
})
if short:
return ret
ret[0]['contents'] += ' '
ret[1]['contents'] += ' '
return ret
def gather_data(path="./"):
print("Interorgating pi about it's status...")
timenow = datetime.datetime.now()
#check storage space
st = os.statvfs(path)
free = (st.f_bavail * st.f_frsize)
total = (st.f_blocks * st.f_frsize)
used = (st.f_blocks - st.f_bfree) * st.f_frsize
try:
percent = ret = (float(used) / total) * 100
except ZeroDivisionError:
percent = 0
#check up time
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
uptime_string = str(datetime.timedelta(seconds = uptime_seconds))
load_ave1,load_ave5,load_ave15 = os.getloadavg() # system load Averages for 1, 5 and 15 min;
#check memory info
with open('/proc/meminfo', 'r') as f:
for line in f:
if line.split(":")[0]=="MemTotal":
memtotal = line.split(":")[1].strip()
elif line.split(":")[0]=="MemAvailable":
memavail = line.split(":")[1].strip()
elif line.split(":")[0]=="MemFree":
memfree = line.split(":")[1].strip()
#check cpu temp with '/opt/vc/bin/vcgencmd measure_temp'
cpu_temp = os.popen('/opt/vc/bin/vcgencmd measure_temp').read().strip()
cpu_temp = cpu_temp.split('=')[1]
#send back data in a dictionary
return {'disk_total':total,
'disk_used':used,
'disk_free':free,
'disk_percent':round(percent, 1),
'timenow':timenow,
'uptime_sec':uptime_seconds,
'uptime_str':uptime_string.split('.')[0],
'load_ave1':load_ave1,
'load_ave5':load_ave5,
'load_ave15':load_ave15,
'memtotal':memtotal,
'memfree':memfree,
'memavail':memavail,
'cpu_temp':cpu_temp
}
def t_hello_process(self, *args, **kwargs):
"""
The 'hello' action is merely to 'speak' with the server. The server
can return current date/time, echo back a string, query the startup
command line args, etc.
This method is a simple means of checking if the server is "up" and
running.
:param args:
:param kwargs:
:return:
"""
self.dp.qprint("In hello process...")
b_status = False
d_ret = {}
for k, v in kwargs.items():
if k == 'request': d_request = v
d_meta = d_request['meta']
if 'askAbout' in d_meta.keys():
str_askAbout = d_meta['askAbout']
d_ret['name'] = self.within.str_name
d_ret['version'] = self.within.str_version
if str_askAbout == 'timestamp':
str_timeStamp = datetime.datetime.today().strftime('%Y%m%d%H%M%S.%f')
d_ret['timestamp'] = {}
d_ret['timestamp']['now'] = str_timeStamp
b_status = True
if str_askAbout == 'sysinfo':
d_ret['sysinfo'] = {}
d_ret['sysinfo']['system'] = platform.system()
d_ret['sysinfo']['machine'] = platform.machine()
d_ret['sysinfo']['platform'] = platform.platform()
d_ret['sysinfo']['uname'] = platform.uname()
d_ret['sysinfo']['version'] = platform.version()
d_ret['sysinfo']['memory'] = psutil.virtual_memory()
d_ret['sysinfo']['cpucount'] = multiprocessing.cpu_count()
d_ret['sysinfo']['loadavg'] = os.getloadavg()
d_ret['sysinfo']['cpu_percent'] = psutil.cpu_percent()
d_ret['sysinfo']['hostname'] = socket.gethostname()
d_ret['sysinfo']['inet'] = [l for l in ([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")][:1], [[(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) if l][0][0]
b_status = True
if str_askAbout == 'echoBack':
d_ret['echoBack'] = {}
d_ret['echoBack']['msg'] = d_meta['echoBack']
b_status = True
return { 'd_ret': d_ret,
'status': b_status}
def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2,
track_cpu_count=False, short=False):
'''Return system load average.
Highlights using ``system_load_good``, ``system_load_bad`` and
``system_load_ugly`` highlighting groups, depending on the thresholds
passed to the function.
:param str format:
format string, receives ``avg`` as an argument
:param float threshold_good:
threshold for gradient level 0: any normalized load average below this
value will have this gradient level.
:param float threshold_bad:
threshold for gradient level 100: any normalized load average above this
value will have this gradient level. Load averages between
``threshold_good`` and ``threshold_bad`` receive gradient level that
indicates relative position in this interval:
(``100 * (cur-good) / (bad-good)``).
Note: both parameters are checked against normalized load averages.
:param bool track_cpu_count:
if True powerline will continuously poll the system to detect changes
in the number of CPUs.
:param bool short:
if True only the sys load over last 1 minute will be displayed.
Divider highlight group used: ``background:divider``.
Highlight groups used: ``system_load_gradient`` (gradient) or ``system_load``.
'''
global cpu_count
try:
cpu_num = cpu_count = _cpu_count() if cpu_count is None or track_cpu_count else cpu_count
except NotImplementedError:
pl.warn('Unable to get CPU count: method is not implemented')
return None
ret = []
for avg in os.getloadavg():
normalized = avg / cpu_num
if normalized < threshold_good:
gradient_level = 0
elif normalized < threshold_bad:
gradient_level = (normalized - threshold_good) * 100.0 / (threshold_bad - threshold_good)
else:
gradient_level = 100
ret.append({
'contents': format.format(avg=avg),
'highlight_groups': ['system_load_gradient', 'system_load'],
'divider_highlight_group': 'background:divider',
'gradient_level': gradient_level,
})
if short:
return ret
ret[0]['contents'] += ' '
ret[1]['contents'] += ' '
return ret
def print_header(procs_status, num_procs):
"""Print system-related info, above the process list."""
def get_dashes(perc):
dashes = "|" * int((float(perc) / 10 * 4))
empty_dashes = " " * (40 - len(dashes))
return dashes, empty_dashes
# cpu usage
percs = psutil.cpu_percent(interval=0, percpu=True)
for cpu_num, perc in enumerate(percs):
dashes, empty_dashes = get_dashes(perc)
print_line(" CPU%-2s [%s%s] %5s%%" % (cpu_num, dashes, empty_dashes,
perc))
mem = psutil.virtual_memory()
dashes, empty_dashes = get_dashes(mem.percent)
line = " Mem [%s%s] %5s%% %6s / %s" % (
dashes, empty_dashes,
mem.percent,
str(int(mem.used / 1024 / 1024)) + "M",
str(int(mem.total / 1024 / 1024)) + "M"
)
print_line(line)
# swap usage
swap = psutil.swap_memory()
dashes, empty_dashes = get_dashes(swap.percent)
line = " Swap [%s%s] %5s%% %6s / %s" % (
dashes, empty_dashes,
swap.percent,
str(int(swap.used / 1024 / 1024)) + "M",
str(int(swap.total / 1024 / 1024)) + "M"
)
print_line(line)
# processes number and status
st = []
for x, y in procs_status.items():
if y:
st.append("%s=%s" % (x, y))
st.sort(key=lambda x: x[:3] in ('run', 'sle'), reverse=1)
print_line(" Processes: %s (%s)" % (num_procs, ', '.join(st)))
# load average, uptime
uptime = datetime.datetime.now() - \
datetime.datetime.fromtimestamp(psutil.boot_time())
av1, av2, av3 = os.getloadavg()
line = " Load average: %.2f %.2f %.2f Uptime: %s" \
% (av1, av2, av3, str(uptime).split('.')[0])
print_line(line)