我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.current_process()。
def serve_forever(self): ''' Run the server forever ''' current_process()._manager_server = self try: try: while 1: try: c = self.listener.accept() except (OSError, IOError): continue t = threading.Thread(target=self.handle_request, args=(c,)) t.daemon = True t.start() except (KeyboardInterrupt, SystemExit): pass finally: self.stop = 999 self.listener.close()
def RebuildProxy(func, token, serializer, kwds): ''' Function used for unpickling proxy objects. If possible the shared object is returned, or otherwise a proxy for it. ''' server = getattr(current_process(), '_manager_server', None) if server and server.address == token.address: return server.id_to_obj[token.id][0] else: incref = ( kwds.pop('incref', True) and not getattr(current_process(), '_inheriting', False) ) return func(token, serializer, incref=incref, **kwds) # # Functions to create proxies and proxy types #
def _get_listener(): global _listener if _listener is None: _lock.acquire() try: if _listener is None: debug('starting listener and thread for sending handles') _listener = Listener(authkey=current_process().authkey) t = threading.Thread(target=_serve) t.daemon = True t.start() finally: _lock.release() return _listener
def run(self): """ Entry point for the live plotting when started as a separate process. This starts the loop """ self.entity_name = current_process().name plogger.info("Starting new thread %s", self.entity_name) self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) self.socket.connect("tcp://localhost:%d" % self.port) topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL) self.socket.setsockopt(zmq.SUBSCRIBE, topic) plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port) self.init(**self.init_kwargs) # Reference to animation required so that GC doesn't clean it up. # WILL NOT work if you remove it!!!!! # See: http://matplotlib.org/api/animation_api.html ani = animation.FuncAnimation(self.fig, self.loop, interval=100) self.plt.show()
def run_optical_flow(vid_item, dev_id=0): vid_path = vid_item[0] vid_id = vid_item[1] vid_name = vid_path.split('/')[-1].split('.')[0] out_full_path = os.path.join(out_path, vid_name) try: os.mkdir(out_full_path) except OSError: pass current = current_process() dev_id = (int(current._identity[0]) - 1) % NUM_GPU image_path = '{}/img'.format(out_full_path) flow_x_path = '{}/flow_x'.format(out_full_path) flow_y_path = '{}/flow_y'.format(out_full_path) cmd = os.path.join(df_path + 'build/extract_gpu')+' -f {} -x {} -y {} -i {} -b 20 -t 1 -d {} -s 1 -o {} -w {} -h {}'.format( quote(vid_path), quote(flow_x_path), quote(flow_y_path), quote(image_path), dev_id, out_format, new_size[0], new_size[1]) os.system(cmd) print '{} {} done'.format(vid_id, vid_name) sys.stdout.flush() return True
def run_warp_optical_flow(vid_item, dev_id=0): vid_path = vid_item[0] vid_id = vid_item[1] vid_name = vid_path.split('/')[-1].split('.')[0] out_full_path = os.path.join(out_path, vid_name) try: os.mkdir(out_full_path) except OSError: pass current = current_process() dev_id = (int(current._identity[0]) - 1) % NUM_GPU flow_x_path = '{}/flow_x'.format(out_full_path) flow_y_path = '{}/flow_y'.format(out_full_path) cmd = os.path.join(df_path + 'build/extract_warp_gpu')+' -f {} -x {} -y {} -b 20 -t 1 -d {} -s 1 -o {}'.format( vid_path, flow_x_path, flow_y_path, dev_id, out_format) os.system(cmd) print 'warp on {} {} done'.format(vid_id, vid_name) sys.stdout.flush() return True
def parallel_cone(pipe,cells,time,cone_input,cone_layer,Vis_dark,Vis_resting_potential): # Initialize array of cone_response copying cone_input cone_response = cone_input for cell in cells: if multiprocessing.current_process().name=="root": progress = 100*(cell-cells[0])/len(cells) stdout.write("\r progress: %d %%"% progress) stdout.flush() # Time-driven simulation for t in np.arange(0,time): # Update dynamics of the model cone_layer[cell].feedInput(cone_input[cell,t]) cone_layer[cell].update() # Record response cone_response[cell,t] = (cone_layer[cell].LF_taum.last_values[0] -\ cone_layer[cell].LF_tauh.last_values[0] - Vis_dark - Vis_resting_potential) pipe.send(cone_response[cells,:]) pipe.close() #! ================ #! Class runNetwork #! ================
def _process_wrapper(function, upwards, profile, *args, **kwargs): """Wrap a process with additional features.""" try: if profile: _run_profiler(function, *args, **kwargs) else: function(*args, **kwargs) except Exception: process = multiprocessing.current_process() info = sys.exc_info() exception = traceback.format_exception( info[0], info[1], info[2].tb_next) _send_message(upwards, _MESSAGE_ERROR, process_id=process.pid, process_name=process.name, message=''.join(exception).rstrip()) finally: upwards.close()
def fix_multiprocessing(**kwargs): ## Fix `AttributeError: 'Process' object has no attribute '_authkey'` try: current_process()._authkey except AttributeError: current_process()._authkey = current_process()._config['authkey'] ## Fix `AttributeError: 'Process' object has no attribute '_daemonic'` ## Also: `https://github.com/celery/celery/issues/1709` try: current_process()._daemonic except AttributeError: # current_process()._daemonic = current_process()._config.get('daemon', False) current_process()._daemonic = False ## Fix `AttributeError: 'Process' object has no attribute '_tempdir'` try: current_process()._tempdir except AttributeError: current_process()._tempdir = None
def create_html_from_pypi(max_pkgs=MAX_PKGS): p = multiprocessing.current_process() print('Starting process:', p.name, p.pid) sys.stdout.flush() try: max_pkgs = int(sys.argv[1]) except (IndexError, ValueError): max_pkgs = MAX_PKGS print(max_pkgs) packages = get_from_pypi(max_pkgs) print(time.time() - start, 'seconds,', len(packages), 'packages.') # with open('index.html', 'w') as out_file: # out_file.write(create_html(packages)) # read_packages(max_pkgs))) print(time.time() - start, 'seconds') print('Exiting :', p.name, p.pid) sys.stdout.flush() return 42 # start a separate process to gather data from PyPI in the background
def run(self): if not self.profile: self.realrun() return try: import cProfile as profile except: import profile prof = profile.Profile() try: profile.Profile.runcall(prof, self.realrun) finally: logfile = "profile-parse-%s.log" % multiprocessing.current_process().name prof.dump_stats(logfile)
def start(self): self.results = self.load_cached() self.processes = [] if self.toparse: bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) def init(): Parser.bb_cache = self.bb_cache bb.utils.set_process_name(multiprocessing.current_process().name) multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, exitpriority=1) multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, exitpriority=1) self.feeder_quit = multiprocessing.Queue(maxsize=1) self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes) self.jobs = multiprocessing.Queue(maxsize=self.num_processes) self.result_queue = multiprocessing.Queue() self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit) self.feeder.start() for i in range(0, self.num_processes): parser = Parser(self.jobs, self.result_queue, self.parser_quit, init, self.cooker.configuration.profile) parser.start() self.process_names.append(parser.name) self.processes.append(parser) self.results = itertools.chain(self.results, self.parse_generator())
def __init__(self, token, serializer, manager=None, authkey=None, exposed=None, incref=True): BaseProxy._mutex.acquire() try: tls_idset = BaseProxy._address_to_local.get(token.address, None) if tls_idset is None: tls_idset = util.ForkAwareLocal(), ProcessLocalSet() BaseProxy._address_to_local[token.address] = tls_idset finally: BaseProxy._mutex.release() # self._tls is used to record the connection used by this # thread to communicate with the manager at token.address self._tls = tls_idset[0] # self._idset is used to record the identities of all shared # objects for which the current process owns references and # which are in the manager at token.address self._idset = tls_idset[1] self._token = token self._id = self._token.id self._manager = manager self._serializer = serializer self._Client = listener_client[serializer][1] if authkey is not None: self._authkey = AuthenticationString(authkey) elif self._manager is not None: self._authkey = self._manager._authkey else: self._authkey = current_process().authkey if incref: self._incref() util.register_after_fork(self, BaseProxy._after_fork)
def client_run(name, x, y, wait, external_lock=None): server_proc = Server(('localhost', 8080), authkey=b'none') server_proc.connect() S = server_proc.Struct() # Either use SyncManager.Lock(), or explicitly pass in a global mp.Lock() # object as args to client processes if not external_lock: print('No external_lock passed. Using server_proc.Lock() instead.') with external_lock or server_proc.my_lock(): for i in range(5): S.update(x+i, y+i) if MEMORY_PROFILING: print(mp.current_process().pid, name, 'updated', i) else: print(name, *S.get()) time.sleep(wait)
def __init__(self, host, db, user, password, ak="DW2CwL3B3271CiVyw7GdBsfR"): logging.debug("Constructor ak:%s" % ak) self.baiduAPIService = BaiduMapAPIService(ak) self.baiduMapDAO = BaiduMapDAO(host, db, user, password) self.around_facilities_distance = [] # ????????????????? self.around_data = {} # ???????????????? self.around_facilities_zuobiao = [] # ?????? self.around_facilities_zhoubiansheshibiaozuoliebiao = [] # ???????? self.facilities_ditance = [] # ????????????????? self.maxdistance_and_hotelzuobiao = [] # ??????????? self.hotelname_and_zuobiao = [] #?????????? self.disigeziduan = [] #????????????? self.bed = {} # def __del__(self): # print "... Destructor BaiduMapSnatcherService... %s" % multiprocessing.current_process().name
def worker(work_queue, done_queue): spinner = spinning_cursor() p = current_process() for nif_path in iter(work_queue.get, 'STOP'): sys.stdout.write("\r\b\033[K{0} [{1}][{2}][{3}]".format( next(spinner), work_queue.qsize(), p.name, nif_path)) sys.stdout.flush() assets = [] try: # assets.append('DEADBEEF') assets = retrieve_assets_from_nif(nif_path) except Exception: pass done_queue.put((nif_path, assets)) done_queue.put('STOP') return True
def bflipper(tokens): mutated_tokens = [] procnum = int(multiprocessing.current_process().name) threadnum = int(threading.current_thread().name) mystart = procnum*max((config_hodor.iterations/config_hodor.procs), 8) # Figure out how to spread threads in a sensible manner for item in tokens: buf = bytearray(item) if isinstance(item, str) else item if len(buf) == 0: mutated_tokens.append(buf) # Nothing to do continue # This is supposed to deal with iterations > buflen in a sane way # Should just loop through and flip more bits at once myflip = config_hodor.mutator["bflipper"]["flipmode"] + (mystart+threadnum)/(len(buf)*8) flipme = (threadnum/8)+(mystart/8) if flipme >= len(buf): flipme = flipme % len(buf) for j in range(myflip): buf[flipme] ^= (1 << ((threadnum+j)%8)) # Minor bug here, will do one extra xor on myflip>1 mutated_tokens.append(buf) return mutated_tokens # Quid pro quo, swap out old tokens for user specified tokens
def craw(self,lock,count): while 1: next_task=self.task.get() if next_task is None: self.task.task_done() continue # print(self.urls.new_urls) # new_url = self.urls.get_new_url() # print("%s craw %d : %s" % (multiprocessing.current_process().name,count, new_url)) # new_html = self.downloader.download(new_url) # new_urls, new_data = self.parser.parse(new_url, new_html) # self.urls.add_new_urls(new_urls) # self.outputer.collect_data(new_data) # self.outputer.output_html() # count += 1 new_url = next_task.a print("%s craw %d : %s" % (multiprocessing.current_process().name, count, new_url)) new_html = self.downloader.download(new_url) new_urls, new_data = self.parser.parse(new_url, new_html) for i in range(len(new_urls)): self.task.put(Task(new_urls[i])) self.outputer.collect_data(new_data) self.outputer.output_html() self.task.task_done() count += 1
def submit(config, user, run_id, pids): """ Submits pipeline defined by 'config' as user 'user'. Dumps the config in a temp. file that is removed after succesful completion. Returns exit code, stdout, and stderr. """ pids[run_id] = mp.current_process().pid (fd, tmp_cfg) = tempfile.mkstemp(prefix='pypers_', suffix='.cfg', text=True) os.fchmod(fd, 0644) with os.fdopen(fd, 'w') as fh: json.dump(config, fh) cmd = [which('np_submit.py'), '-i', tmp_cfg] (ec, err, out) = run_as(cmd=cmd, user=user) if ec == 0: os.unlink(tmp_cfg) return (err, out) else: raise Exception('Unable to execute cmd %s:\n%s\n%s' % (cmd, err, out))
def print_statistic(self): now = time.time() if now - self.checkpoint > self.statistic_interval: count = self.count.value self.count.value = 0 delta = now - self.checkpoint self.checkpoint = now if now - self.checkpoint > 3 * self.statistic_interval: # ????, ???????, ??????????, ????? log.info("inserted {} rows in the past {}s".format(count, round(delta, 3))) else: log.info( "delta:{}s count:{} speed:{}/s qsize:{} qfull:{} P:{} Th:{}".format( round(delta, 3), count, round(count / delta, 2), self.queue.qsize(), self.queue.full(), multiprocessing.current_process().name, threading.current_thread().name, ))
def run(self): while True: try: next_task = self.task_queue.get() if not next_task: # print("%s Poisoned" % multiprocessing.current_process().name, file=sys.stderr) self.task_queue.task_done() break try: result = next_task() self.result_queue.put(result) except Exception as e: if self.exception_handling == ExceptionHandling.IGNORE: # print("%s Exception: %s" % (multiprocessing.current_process().name, e), file=sys.stderr) # print("%s IGNORE error" % multiprocessing.current_process().name, file=sys.stderr) pass elif self.exception_handling == ExceptionHandling.THROW: # Caution self.task_queue.task_done() raise e else: # Special Token self.result_queue.put(self.exception_handling) self.task_queue.task_done() except Exception as e: raise e pass
def __init__(self, address=None, authkey=None, serializer='pickle'): if authkey is None: authkey = current_process().authkey self._address = address # XXX not final address if eg ('', 0) self._authkey = AuthenticationString(authkey) self._state = State() self._state.value = State.INITIAL self._serializer = serializer self._Listener, self._Client = listener_client[serializer]
def _connect(self): util.debug('making connection to manager') name = current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name conn = self._Client(self._token.address, authkey=self._authkey) dispatch(conn, None, 'accept_connection', (name,)) self._tls.connection = conn
def rebuild_handle(pickled_data): address, handle, inherited = pickled_data if inherited: return handle sub_debug('rebuilding handle %d', handle) conn = Client(address, authkey=current_process().authkey) conn.send((handle, os.getpid())) new_handle = recv_handle(conn) conn.close() return new_handle # # Register `_multiprocessing.Connection` with `ForkingPickler` #
def cleanup_and_exit(self, code, frame): if not current_process().name == "MainProcess": return logging.info("Starting cleanup procedure! Stopping running threads") # TODO Move submodules into self that populates as used? submodules = ['replset', 'sharding', 'backup', 'oplogtailer', 'archive', 'upload'] for submodule_name in submodules: try: submodule = getattr(self, submodule_name) if submodule: submodule.close() except Exception: continue if self.manager: self.manager.shutdown() if self.db: self.db.close() if self.notify: try: self.notify.notify("%s: backup '%s/%s' failed! Error: '%s'" % ( self.program_name, self.config.backup.name, self.backup_time, self.last_error_msg )) self.notify.run() self.notify.close() except Exception, e: logging.error("Error from notifier: %s" % e) logging.info("Cleanup complete, exiting") if self.logger: self.logger.rotate() self.logger.close() self.release_lock() sys.exit(1)
def write(self, data): # note that these pids are in the form of current_process()._identity # rather than OS pids from multiprocessing import current_process pid = current_process()._identity self.__queue.put((pid, data))