Python os 模块,nice() 实例源码
我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用os.nice()。
def per_process_init():
# type: () -> None
try:
os.nice(19)
except AttributeError:
# nice is not available everywhere.
pass
except OSError:
# When this program is already running on the nicest level (20) on OS X
# it is not permitted to change the priority.
pass
# A keyboard interrupt disrupts the communication between a
# Python script and its subprocesses when using multiprocessing.
# The child can ignore SIGINT and is properly shut down
# by a pool.terminate() call in case of a keyboard interrupt
# or an early generator exit.
signal.signal(signal.SIGINT, signal.SIG_IGN)
def update_charts(q, event, size):
try:
os.nice(10)
except AttributeError:
logging.warn("can't be nice to windows")
q.put((CRAWL_MESSAGE, 4, 'Chart engine starting...'))
base_map = graphics.create_map()
last_qso_timestamp = 0
q.put((CRAWL_MESSAGE, 4, ''))
try:
while not event.is_set():
t0 = time.time()
last_qso_timestamp = load_data(size, q, base_map, last_qso_timestamp)
t1 = time.time()
delta = t1 - t0
update_delay = config.DATA_DWELL_TIME - delta
if update_delay < 0:
update_delay = config.DATA_DWELL_TIME
logging.debug('Next data update in %f seconds', update_delay)
event.wait(update_delay)
except Exception, e:
logging.exception('Exception in update_charts', exc_info=e)
q.put((CRAWL_MESSAGE, 4, 'Chart engine failed.', graphics.YELLOW, graphics.RED))
def calculate_angle_quality_thread(self,
voi,
gantry,
couch,
calculate_from=0,
stepsize=1.0,
q=None,
avoid=[],
voi_cube=None,
gradient=True):
""" TODO: Documentation
"""
os.nice(1)
for couch_angle in couch:
qual = self.calculate_angle_quality(voi, gantry, couch_angle, calculate_from, stepsize, avoid, voi_cube,
gradient)
q.put({"couch": couch_angle, "gantry": gantry, "data": qual})
def parseConfiguration(self):
# Set log file verbosity
verboselogs = bb.utils.to_boolean(self.data.getVar("BB_VERBOSE_LOGS", False))
if verboselogs:
bb.msg.loggerVerboseLogs = True
# Change nice level if we're asked to
nice = self.data.getVar("BB_NICE_LEVEL", True)
if nice:
curnice = os.nice(0)
nice = int(nice) - curnice
buildlog.verbose("Renice to %s " % os.nice(nice))
if self.recipecaches:
del self.recipecaches
self.multiconfigs = self.databuilder.mcdata.keys()
self.recipecaches = {}
for mc in self.multiconfigs:
self.recipecaches[mc] = bb.cache.CacheData(self.caches_array)
self.handleCollections(self.data.getVar("BBFILE_COLLECTIONS", True))
def make_ggplot_figures(self):
info_filename = self.get_tsv_path()
if not os.path.isfile(info_filename):
print('Could not find tsv file: ' + info_filename)
return
r_script = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'nanopore_read_analysis.R')
print()
print(' '.join(['nanopore_read_analysis.R', info_filename]), flush=True)
command = [r_script, info_filename]
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=lambda: os.nice(20))
out, err = p.communicate()
p.wait()
script_output_lines = err.decode().splitlines()
for line in script_output_lines:
print(' ' + line)
def start_new_process(args, nice_value=0):
"start a new process in a pty and renice it"
data = {}
data['start_time'] = time.time()
pid, master_fd = pty.fork()
if pid == CHILD:
default_signals()
if nice_value:
os.nice(nice_value)
os.execvp(args[0], [a.encode(cf['_charset'], "replace") for a in args])
else:
data['pid'] = pid
if os.uname()[0] == "Linux":
fcntl.fcntl(master_fd, F_SETFL, O_NONBLOCK)
data['fd'] = master_fd
data['file'] = os.fdopen(master_fd)
data['cmd'] = args
data['buf'] = ""
data['otf'] = 0
data['percent'] = 0
data['elapsed'] = 0
return data
def beNice(very_nice=False):
if very_nice:
value = 10
else:
value = 5
nice(value)
def _child(self, nice_level, child_on_start, child_on_exit):
# right now we need to call a function, but first we need to
# map all IO that might happen
sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
retvalf = self.RETVAL.open("wb")
EXITSTATUS = 0
try:
if nice_level:
os.nice(nice_level)
try:
if child_on_start is not None:
child_on_start()
retval = self.fun(*self.args, **self.kwargs)
retvalf.write(marshal.dumps(retval))
if child_on_exit is not None:
child_on_exit()
except:
excinfo = py.code.ExceptionInfo()
stderr.write(str(excinfo._getreprcrash()))
EXITSTATUS = self.EXITSTATUS_EXCEPTION
finally:
stdout.close()
stderr.close()
retvalf.close()
os.close(1)
os.close(2)
os._exit(EXITSTATUS)
def test_popen_nice(self, makegateway):
gw = makegateway("popen")
def getnice(channel):
import os
if hasattr(os, 'nice'):
channel.send(os.nice(0))
else:
channel.send(None)
remotenice = gw.remote_exec(getnice).receive()
gw.exit()
if remotenice is not None:
gw = makegateway("popen//nice=5")
remotenice2 = gw.remote_exec(getnice).receive()
assert remotenice2 == remotenice + 5
def _child(self, nice_level, child_on_start, child_on_exit):
# right now we need to call a function, but first we need to
# map all IO that might happen
sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
retvalf = self.RETVAL.open("wb")
EXITSTATUS = 0
try:
if nice_level:
os.nice(nice_level)
try:
if child_on_start is not None:
child_on_start()
retval = self.fun(*self.args, **self.kwargs)
retvalf.write(marshal.dumps(retval))
if child_on_exit is not None:
child_on_exit()
except:
excinfo = py.code.ExceptionInfo()
stderr.write(str(excinfo._getreprcrash()))
EXITSTATUS = self.EXITSTATUS_EXCEPTION
finally:
stdout.close()
stderr.close()
retvalf.close()
os.close(1)
os.close(2)
os._exit(EXITSTATUS)
def gzip_fast5s(self):
tarball = self.get_tarball_path()
if os.path.isfile(tarball):
os.remove(tarball)
print('\ngzipping reads into a tar.gz file...', flush=True)
base_dir = self.base_dir
if not base_dir.endswith('/'):
base_dir += '/'
for directory in self.all_dirs:
assert directory.startswith(self.base_dir)
rel_dirs = [x[len(base_dir):] for x in self.all_dirs]
current_dir = os.getcwd()
os.chdir(self.base_dir)
# Use pigz if available to speed up the compression.
pigz_path = shutil.which('pigz')
if pigz_path:
tar_cmd = 'tar cf - ' + ' '.join(rel_dirs) + ' | pigz -9 -p 8 > ' + tarball
else:
tar_cmd = 'tar -czvf ' + tarball + ' '.join(rel_dirs)
tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=lambda: os.nice(20), shell=True)
_, _ = tar.communicate()
tar.wait()
os.chdir(current_dir)
def tarball_count_is_correct(self):
tarball = self.get_tarball_path()
print('Checking that fast5 file count matches that in ' + os.path.basename(tarball) +
'... ', end='')
fast5_count = len(self.all_fast5_files)
tar_count_cmd = 'pigz -dc ' + tarball + ' | tar tf - | grep -P ".fast5$" | wc -l'
p = subprocess.Popen(tar_count_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=lambda: os.nice(20), shell=True)
out, _ = p.communicate()
tar_count = int(out.decode())
match = tar_count == fast5_count
if match:
print('all good!')
return match
def nanonetcall(directory, threads):
temp_fastq = 'temp_' + str(random.randint(0, 100000000)) + '.fastq'
nanonetcall_cmd = ['nanonetcall', '--fastq', '--write_events', '--jobs', str(threads),
'--max_len', '100000', directory]
with open(temp_fastq, 'wb') as fastq_out:
p = subprocess.Popen(nanonetcall_cmd, stdout=fastq_out, stderr=subprocess.PIPE,
preexec_fn=lambda: os.nice(20))
_, _ = p.communicate()
p.wait()
remove_if_exists(temp_fastq)
def nanonetcall_2d(directory, threads):
temp_prefix = 'temp_' + str(random.randint(0, 100000000))
nanonet2d_cmd = ['nanonet2d', '--fastq', '--write_events', '--jobs', str(threads),
'--max_len', '100000', directory, temp_prefix]
p = subprocess.Popen(nanonet2d_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=lambda: os.nice(20))
_, _ = p.communicate()
p.wait()
remove_if_exists(temp_prefix + '_template.fastq')
remove_if_exists(temp_prefix + '_complement.fastq')
remove_if_exists(temp_prefix + '_2d.fastq')
def _child(self, nice_level, child_on_start, child_on_exit):
# right now we need to call a function, but first we need to
# map all IO that might happen
sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
retvalf = self.RETVAL.open("wb")
EXITSTATUS = 0
try:
if nice_level:
os.nice(nice_level)
try:
if child_on_start is not None:
child_on_start()
retval = self.fun(*self.args, **self.kwargs)
retvalf.write(marshal.dumps(retval))
if child_on_exit is not None:
child_on_exit()
except:
excinfo = py.code.ExceptionInfo()
stderr.write(str(excinfo._getreprcrash()))
EXITSTATUS = self.EXITSTATUS_EXCEPTION
finally:
stdout.close()
stderr.close()
retvalf.close()
os.close(1)
os.close(2)
os._exit(EXITSTATUS)
def _child(self, nice_level, child_on_start, child_on_exit):
# right now we need to call a function, but first we need to
# map all IO that might happen
sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
retvalf = self.RETVAL.open("wb")
EXITSTATUS = 0
try:
if nice_level:
os.nice(nice_level)
try:
if child_on_start is not None:
child_on_start()
retval = self.fun(*self.args, **self.kwargs)
retvalf.write(marshal.dumps(retval))
if child_on_exit is not None:
child_on_exit()
except:
excinfo = py.code.ExceptionInfo()
stderr.write(str(excinfo._getreprcrash()))
EXITSTATUS = self.EXITSTATUS_EXCEPTION
finally:
stdout.close()
stderr.close()
retvalf.close()
os.close(1)
os.close(2)
os._exit(EXITSTATUS)
def main():
print(os.nice(0)) # get relative process priority
print(os.nice(1)) # change relative priority
print(os.times()) # process times: system, user etc...
print(os.isatty(0)) # is the file descriptor arg a tty?(0 = stdin)
print(os.isatty(4)) # 4 is just an arbitrary test value
print(os.getloadavg()) # UNIX only - number of processes in queue
print(os.cpu_count()) # New in Python 3.4
def _partition_init_worker():
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
import os
os.nice(5)
def init_worker():
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
import os
os.nice(5)
def _child(self, nice_level, child_on_start, child_on_exit):
# right now we need to call a function, but first we need to
# map all IO that might happen
sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
retvalf = self.RETVAL.open("wb")
EXITSTATUS = 0
try:
if nice_level:
os.nice(nice_level)
try:
if child_on_start is not None:
child_on_start()
retval = self.fun(*self.args, **self.kwargs)
retvalf.write(marshal.dumps(retval))
if child_on_exit is not None:
child_on_exit()
except:
excinfo = py.code.ExceptionInfo()
stderr.write(str(excinfo._getreprcrash()))
EXITSTATUS = self.EXITSTATUS_EXCEPTION
finally:
stdout.close()
stderr.close()
retvalf.close()
os.close(1)
os.close(2)
os._exit(EXITSTATUS)
def _child(self, nice_level, child_on_start, child_on_exit):
# right now we need to call a function, but first we need to
# map all IO that might happen
sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
retvalf = self.RETVAL.open("wb")
EXITSTATUS = 0
try:
if nice_level:
os.nice(nice_level)
try:
if child_on_start is not None:
child_on_start()
retval = self.fun(*self.args, **self.kwargs)
retvalf.write(marshal.dumps(retval))
if child_on_exit is not None:
child_on_exit()
except:
excinfo = py.code.ExceptionInfo()
stderr.write(str(excinfo._getreprcrash()))
EXITSTATUS = self.EXITSTATUS_EXCEPTION
finally:
stdout.close()
stderr.close()
retvalf.close()
os.close(1)
os.close(2)
os._exit(EXITSTATUS)
def _child(self, nice_level, child_on_start, child_on_exit):
# right now we need to call a function, but first we need to
# map all IO that might happen
sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
retvalf = self.RETVAL.open("wb")
EXITSTATUS = 0
try:
if nice_level:
os.nice(nice_level)
try:
if child_on_start is not None:
child_on_start()
retval = self.fun(*self.args, **self.kwargs)
retvalf.write(marshal.dumps(retval))
if child_on_exit is not None:
child_on_exit()
except:
excinfo = py.code.ExceptionInfo()
stderr.write(str(excinfo._getreprcrash()))
EXITSTATUS = self.EXITSTATUS_EXCEPTION
finally:
stdout.close()
stderr.close()
retvalf.close()
os.close(1)
os.close(2)
os._exit(EXITSTATUS)
def lowerCurrentProcessPriority():
if buildcommon.isWindows():
import win32process, win32api,win32con
win32process.SetPriorityClass(win32api.GetCurrentProcess(), win32process.BELOW_NORMAL_PRIORITY_CLASS)
else:
# on unix, people may run nice before executing the process, so
# only change the priority unilaterally if it's currently at its
# default value
if os.nice(0) == 0:
os.nice(1) # change to 1 below the current level
def set_process_priority(nice: bool=True, ionice: bool=False,
cpulimit: int=0) -> bool:
"""Set process name and cpu priority."""
w = " may delay I/O Operations, not recommended on user-facing GUI!."
try:
if nice:
_old = os.getpriority(os.PRIO_PROCESS, 0)
os.nice(19) # smooth cpu priority
log.debug(f"Process CPUs Priority set: from {_old} to 19.")
elif ionice and which("ionice"):
log.warning("ionice" + w)
cmnd = f"{which('ionice')} --ignore --class 3 --pid {os.getpid()}"
call(cmnd, shell=True) # I/O nice,should work on Linux and Os X
log.debug(f"Process PID {os.getpid()} I/O Priority set to: {cmnd}")
elif cpulimit and which("cpulimit"):
log.warning("cpulimit" + w)
log.debug("Launching 1 background 'cpulimit' child subprocess...")
cpulimit = int(cpulimit if cpulimit > 4 else 5) # makes 5 the min.
command = "{0} --include-children --pid={1} --limit={2}".format(
which("cpulimit"), os.getpid(), cpulimit)
proces = Popen(command, shell=True) # This launch a subprocess.
atexit.register(proces.kill) # Force Kill subprocess at exit.
log.debug(f"Process CPUs Max Limits capped to: {command}.")
except Exception as error:
log.warning(error)
return False # this may fail on windows and its normal, so be silent.
else:
return True