我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.process()。
def _validate(self): """Raise exception if this handle is closed or not registered to be used in the current process. Intended to be called before every operation on `self._fd`. Reveals wrong usage of this module in the context of multiple processes. Might prevent tedious debugging sessions. Has little performance impact. """ if self._closed: raise GIPCClosed( "GIPCHandle has been closed before.") if os.getpid() != self._legit_pid: raise GIPCError( "GIPCHandle %s not registered for current process %s." % ( self, os.getpid()))
def _winapi_childhandle_after_createprocess_child(self): """Called on Windows in the child process after the CreateProcess() system call. This is required for making the handle usable in the child. """ if WINAPI_HANDLE_TRANSFER_STEAL: # In this case the handle has not been inherited by the child # process during CreateProcess(). Steal it from the parent. new_winapihandle = multiprocessing.reduction.steal_handle( self._parent_pid, self._parent_winapihandle) del self._parent_winapihandle del self._parent_pid # Restore C file descriptor with (read/write)only flag. self._fd = msvcrt.open_osfhandle(new_winapihandle, self._fd_flag) return # In this case the handle has been inherited by the child process during # the CreateProcess() system call. Get C file descriptor from Windows # file handle. self._fd = msvcrt.open_osfhandle( self._inheritable_winapihandle, self._fd_flag) del self._inheritable_winapihandle
def run_trial(self, trial_num, param): ''' algo step 2, construct and run Trial with the next param args trial_num, param must be provided externally, otherwise they will not progress within mp.process ''' experiment_spec = self.compose_experiment_spec(param) trial = self.Trial( experiment_spec, trial_num=trial_num, times=self.times, num_of_trials=self.num_of_trials, run_timestamp=self.run_timestamp, experiment_id_override=self.experiment_id_override) trial_data = trial.run() del trial import gc gc.collect() debug_mem_usage() return trial_data # retrieve the trial_num, param, fitness_score from trial_data
def start(self): # Start grabbing SIGCHLD within libev event loop. gevent.get_hub().loop.install_sigchld() # Run new process (based on `fork()` on POSIX-compliant systems). super(_GProcess, self).start() # The occurrence of SIGCHLD is recorded asynchronously in libev. # This guarantees proper behavior even if the child watcher is # started after the child exits. Start child watcher now. self._sigchld_watcher = gevent.get_hub().loop.child(self.pid) self._returnevent = gevent.event.Event() self._sigchld_watcher.start( self._on_sigchld, self._sigchld_watcher) log.debug("SIGCHLD watcher for %s started.", self.pid)
def __repr__(self): """Based on original __repr__ from CPython 3.4's mp package. Reasons for re-implementing: * The original code would invoke os.waitpid() through _popen.poll(). This is forbidden in the context of gipc. This method instead reads the exitcode property which is set asynchronously by a libev child watcher callback. * The original code distinguishes 'initial' state from 'started' state. This is not necessary, as gipc starts processes right away. * This method removes the `if self is _current_process` check without changing output behavior (that's still 'started' status). """ exitcodedict = multiprocessing.process._exitcode_to_name status = 'started' if self._parent_pid != os.getpid(): status = 'unknown' elif self.exitcode is not None: status = self.exitcode if status == 0: status = 'stopped' elif isinstance(status, int): status = 'stopped[%s]' % exitcodedict.get(status, status) return '<%s(%s, %s%s)>' % ( type(self).__name__, self._name, status, self.daemon and ' daemon' or '' )
def join(self, timeout=None): """ Wait cooperatively until child process terminates or timeout occurs. :arg timeout: ``None`` (default) or a a time in seconds. The method simply returns upon timeout expiration. The state of the process has to be identified via ``is_alive()``. """ assert self._parent_pid == os.getpid(), "I'm not parent of this child." assert self._popen is not None, 'Can only join a started process.' if not WINDOWS: # Resemble multiprocessing's join() method while replacing # `self._popen.wait(timeout)` with # `self._returnevent.wait(timeout)` self._returnevent.wait(timeout) if self._popen.returncode is not None: if hasattr(multiprocessing.process, '_children'): # This is for Python 3.4. kids = multiprocessing.process._children else: # For Python 2.6, 2.7, 3.3. kids = multiprocessing.process._current_process._children kids.discard(self) return with gevent.Timeout(timeout, False): while self.is_alive(): # This frequency seems reasonable, but that's not 100 % certain. gevent.sleep(0.01) # Clean up after child as designed by Process class (non-blocking). super(_GProcess, self).join(timeout=0)
def _set_legit_process(self): log.debug("Legitimate %s for current process.", self) self._legit_pid = os.getpid()
def _filter_handles(l): """Iterate through `l`, filter and yield `_GIPCHandle` instances. """ for o in l: if isinstance(o, _GIPCHandle): yield o elif isinstance(o, _GIPCDuplexHandle): yield o._writer yield o._reader # Container for keeping track of valid `_GIPCHandle`s in current process.
def bind_port(sock, host=HOST): """Bind the socket to a free port and return the port number. Relies on ephemeral ports in order to ensure we are using an unbound port. This is important as many tests may be running simultaneously, especially in a buildbot environment. This method raises an exception if the sock.family is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR or SO_REUSEPORT set on it. Tests should *never* set these socket options for TCP/IP sockets. The only case for setting these options is testing multicasting via multiple UDP sockets. Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. on Windows), it will be set on the socket. This will prevent anyone else from bind()'ing to our host/port for the duration of the test. """ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: if hasattr(socket, 'SO_REUSEADDR'): if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: raise TestFailed("tests should never set the SO_REUSEADDR " \ "socket option on TCP/IP sockets!") if hasattr(socket, 'SO_REUSEPORT'): try: if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: raise TestFailed("tests should never set the SO_REUSEPORT " \ "socket option on TCP/IP sockets!") except socket.error: # Python's socket module was compiled using modern headers # thus defining SO_REUSEPORT but this process is running # under an older kernel that does not support SO_REUSEPORT. pass if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) sock.bind((host, 0)) port = sock.getsockname()[1] return port
def temp_umask(umask): """Context manager that temporarily sets the process umask.""" oldmask = os.umask(umask) try: yield finally: os.umask(oldmask)
def strip_python_stderr(stderr): """Strip the stderr of a Python process from potential debug output emitted by the interpreter. This will typically be run on the result of the communicate() method of a subprocess.Popen object. """ stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip() return stderr
def _memory_watchdog(start_evt, finish_evt, period=10.0): """A function which periodically watches the process' memory consumption and prints it out. """ # XXX: because of the GIL, and because the very long operations tested # in most bigmem tests are uninterruptible, the loop below gets woken up # much less often than expected. # The polling code should be rewritten in raw C, without holding the GIL, # and push results onto an anonymous pipe. try: page_size = os.sysconf('SC_PAGESIZE') except (ValueError, AttributeError): try: page_size = os.sysconf('SC_PAGE_SIZE') except (ValueError, AttributeError): page_size = 4096 procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) try: f = open(procfile, 'rb') except IOError as e: warnings.warn('/proc not available for stats: {}'.format(e), RuntimeWarning) sys.stderr.flush() return with f: start_evt.set() old_data = -1 while not finish_evt.wait(period): f.seek(0) statm = f.read().decode('ascii') data = int(statm.split()[5]) if data != old_data: old_data = data print(" ... process data size: {data:.1f}G" .format(data=data * page_size / (1024 ** 3)))
def strip_python_stderr(stderr): """Strip the stderr of a Python process from potential debug output emitted by the interpreter. This will typically be run on the result of the communicate() method of a subprocess.Popen object. """ stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() return stderr
def get_multiprocessing_process__dangling(self): if not multiprocessing: return None # This copies the weakrefs without making any strong reference return multiprocessing.process._dangling.copy()
def restore_multiprocessing_process__dangling(self, saved): if not multiprocessing: return multiprocessing.process._dangling.clear() multiprocessing.process._dangling.update(saved)
def bind_port(sock, host=HOST): """Bind the socket to a free port and return the port number. Relies on ephemeral ports in order to ensure we are using an unbound port. This is important as many tests may be running simultaneously, especially in a buildbot environment. This method raises an exception if the sock.family is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR or SO_REUSEPORT set on it. Tests should *never* set these socket options for TCP/IP sockets. The only case for setting these options is testing multicasting via multiple UDP sockets. Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. on Windows), it will be set on the socket. This will prevent anyone else from bind()'ing to our host/port for the duration of the test. """ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: if hasattr(socket, 'SO_REUSEADDR'): if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: raise TestFailed("tests should never set the SO_REUSEADDR " \ "socket option on TCP/IP sockets!") if hasattr(socket, 'SO_REUSEPORT'): try: if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: raise TestFailed("tests should never set the SO_REUSEPORT " \ "socket option on TCP/IP sockets!") except OSError: # Python's socket module was compiled using modern headers # thus defining SO_REUSEPORT but this process is running # under an older kernel that does not support SO_REUSEPORT. pass if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) sock.bind((host, 0)) port = sock.getsockname()[1] return port
def temp_umask(umask): """Context manager that temporarily sets the process umask.""" oldmask = os.umask(umask) try: yield finally: os.umask(oldmask) # TEST_HOME_DIR refers to the top level directory of the "test" package # that contains Python's regression test suite
def get_multiprocessing_process__dangling(self): if not multiprocessing: return None # Unjoined process objects can survive after process exits multiprocessing.process._cleanup() # This copies the weakrefs without making any strong reference return multiprocessing.process._dangling.copy()