我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用multiprocessing.TimeoutError()。
def get(self, timeout=None): """ Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get(). """ try: res = self._q.get(timeout=timeout) except Queue.Empty: raise multiprocessing.TimeoutError("Timed out") if isinstance(res, Exception): raise res return res
def wait(self, max_wait_secs=6 * 3600, poll_secs=2): if len(self._pooled) > 0: waited_secs = 0 self._pool.close() while len(self._pooled): logging.debug("Waiting for %i oplog resolver thread(s) to stop" % len(self._pooled)) try: for thread_name in self._pooled: thread = self._results[thread_name] thread.get(poll_secs) except TimeoutError: if waited_secs < max_wait_secs: waited_secs += poll_secs else: raise OperationError("Waited more than %i seconds for Oplog resolver! I will assume there is a problem and exit") self._pool.terminate() logging.debug("Stopped all oplog resolver threads") self.stopped = True self.running = False
def _multitest_binary_pov(self, pov_path, cb_path, enable_randomness, debug, bitflip, timeout, times): pool = Pool(processes=4) res = [pool.apply_async(self._test_binary_pov, (pov_path, cb_path, enable_randomness, debug, bitflip, timeout)) for _ in range(times)] results = [ ] for r in res: try: results.append(r.get(timeout=timeout + 5)) except TimeoutError: results.append(False) return results
def map(self, func, iterable, chunksize=None): """ Equivalent of `map()` built-in, without swallowing `KeyboardInterrupt`. :param func: The function to apply to the items. :param iterable: An iterable of items that will have `func` applied to them. """ # The key magic is that we must call r.get() with a timeout, because # a Condition.wait() without a timeout swallows KeyboardInterrupts. r = self.map_async(func, iterable, chunksize) while True: try: return r.get(self.wait_timeout) except multiprocessing.TimeoutError: pass except KeyboardInterrupt: self.terminate() self.join() raise
def kill(self, retries=3, pause=.1): try: try: self.task.get(1) self.__set_info('KILLED', "None") except (AttributeError, TimeoutError): self.__set_info('CANCELLED', "None") except UnicodeEncodeError: self.__set_info('CRASHED', "None") for pid in self.pids: try: with open(pid) as f: os.kill(int(f.read().strip()), signal.SIGTERM) os.remove(pid) except (IOError, OSError): pass # simply fail silently when no PID or OS cannot kill it as it is already terminated if self.command.__name__.lstrip('_') == 'run' and retries > 0: time.sleep(pause) # wait ... sec that the next call from the command starts # this is necessary e.g. with cooja command (when Cooja starts a first time for # a simulation without a malicious mote then a second time with) self.kill(retries - 1, 2 * pause) # then kill it except KeyboardInterrupt: self.kill(0, 0)
def exploit_challenges(): challenges = get_challenges_paths() status = {n: False for n,_ in challenges} start = time.time() results = [] with Pool(processes=len(challenges)) as pool: multiple_results = [pool.apply_async(exploit, (name,path,)) for name, path in challenges] for res in multiple_results: try: results.append(res.get(timeout=timeout+1)) except TimeoutError: print("Got a timeout.") duration = time.time() - start print("All challenges exploited in " + str(duration) + " sec.") for chall_name, exploitable in results: status[chall_name] = exploitable return status
def _ParallelSymbolizeBacktrace(backtrace): # Disable handling of SIGINT during sub-process creation, to prevent # sub-processes from consuming Ctrl-C signals, rather than the parent # process doing so. saved_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) p = multiprocessing.Pool(multiprocessing.cpu_count()) # Restore the signal handler for the parent process. signal.signal(signal.SIGINT, saved_sigint_handler) symbolized = [] try: result = p.map_async(_SymbolizeEntry, backtrace) symbolized = result.get(SYMBOLIZATION_TIMEOUT_SECS) if not symbolized: return [] except multiprocessing.TimeoutError: return ['(timeout error occurred during symbolization)'] except KeyboardInterrupt: # SIGINT p.terminate() return symbolized
def _get_result(self,resultl): result='' # get result from list for cur in resultl: try: result+=cur.get(timeout=self._timeout) except TimeoutError as e: continue # deal with result if result=='': result='no dir or file' else: result=result[:-1] return result
def _get_result(self,resultl): result='' # get result from list for cur in resultl: try: result+=cur.get(timeout=self._timeout) except TimeoutError as e: continue # deal with result if result=='': result='no subdomain' else: result=result[:-1] return result
def _get_result(self,resultl): result='' # get result from list for cur in resultl: try: result+=cur.get(timeout=self._timeout) except TimeoutError as e: continue # deal with result if result=='': result='nothing here' else: result=result[:-1] return result
def _get_resultl(self,resultl): retl=[] # get result from eflist for cur in resultl: try: retl.extend(cur.get(timeout=self._timeout)) except TimeoutError as e: continue return retl
def parse_structure(self, structure_file, timeout, model=None, parser=RegularStructureParser): """ Call StructureParser.parse_structure() in a separate process and return the output. Raise TimeoutError if the parser does not respond within C{timeout} seconds. @param structure_file: structure file to parse @type structure_file: str @param timeout: raise multiprocessing.TimeoutError if C{timeout} seconds elapse before the parser completes its job @type timeout: int @param parser: any implementing L{AbstractStructureParser} class @type parser: type @return: parsed structure @rtype: L{csb.structure.Structure} """ r = self.parse_async([structure_file], timeout, model, parser) if len(r) > 0: if r[0].exception is not None: raise r[0].exception else: return r[0].result return None
def abortable_worker(func, *args, **kwargs): # returns ("null",) if timeout timeout = kwargs.get('timeout', None) p = multiprocessing.dummy.Pool(1) res = p.apply_async(func, args=args) try: out = res.get(timeout) # Wait timeout seconds for func to complete. return out except multiprocessing.TimeoutError: return ("null",)
def test_map_chunksize(self): try: self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) except multiprocessing.TimeoutError: self.fail("pool.map_async with chunksize stalled on null list")
def test_async_timeout(self): res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) get = TimingWrapper(res.get) self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
def pnk_request(self, url): pool = ThreadPool(processes = 1) async_result = pool.apply_async(self.pnk_request_raw, (url,)) try: ret_val = async_result.get(timeout = self.hard_timeout) except TimeoutError as te: traceback.print_exc() #raise requests ConnectionError for easier handling if there's a hard timeout raise ConnectionError("Request received a hard timeout") return ret_val
def pnk_request(url): pool = ThreadPool(processes = 1) async_result = pool.apply_async(pnk_request_raw, (url,)) try: ret_val = async_result.get(timeout = int(conf.get("punkcrawler", "hard_timeout"))) except TimeoutError as te: traceback.print_exc() pnk_log(mod, "Received hard timeout, raising timeout exception") #raise requests ConnectionError for easier handling if there's a hard timeout raise ConnectionError("Request received a hard timeout") return ret_val
def check_requirements(self): if 'TRAVIS' in os.environ: raise CheckFailed("Can't build with Travis") # This check needs to be performed out-of-process, because # importing gi and then importing regular old pygtk afterward # segfaults the interpreter. try: p = multiprocessing.Pool() except: return "unknown (can not use multiprocessing to determine)" try: res = p.map_async(backend_gtk3agg_internal_check, [0]) success, msg = res.get(timeout=10)[0] except multiprocessing.TimeoutError: p.terminate() # No result returned. Probaly hanging, terminate the process. success = False raise CheckFailed("Check timed out") except: p.close() # Some other error. success = False msg = "Could not determine" raise else: p.close() finally: p.join() if success: return msg else: raise CheckFailed(msg)
def check_requirements(self): if 'TRAVIS' in os.environ: raise CheckFailed("Can't build with Travis") # This check needs to be performed out-of-process, because # importing gi and then importing regular old pygtk afterward # segfaults the interpreter. try: p = multiprocessing.Pool() except: return "unknown (can not use multiprocessing to determine)" try: res = p.map_async(backend_gtk3cairo_internal_check, [0]) success, msg = res.get(timeout=10)[0] except multiprocessing.TimeoutError: p.terminate() # No result returned. Probaly hanging, terminate the process. success = False raise CheckFailed("Check timed out") except: p.close() success = False raise else: p.close() finally: p.join() if success: return msg else: raise CheckFailed(msg)
def timeout(seconds): """ """ def handler(*args, **kwargs): logger.debug("TimeoutError in timeout context manager handler.") raise TimeoutError("Timeout after {} seconds".format(seconds)) signal.signal(signal.SIGPROF, handler) signal.setitimer(signal.ITIMER_PROF, seconds) try: yield finally: signal.setitimer(signal.ITIMER_PROF, 0)
def test_fetch_cvimage_from_url_timeout(monkeypatch): def long_func(*args, **kwargs): time.sleep(3) monkeypatch.setattr(requests, 'get', long_func) with pytest.raises(TimeoutError): classifiers.fetch_cvimage_from_url('this url is ignored', timeout_max_timeout=1)
def timeout(max_timeout): """Timeout decorator, parameter in seconds.""" def timeout_decorator(f): """Wrap the original function.""" @functools.wraps(f) def func_wrapper(self, *args, **kwargs): """Closure for function.""" pool = multiprocessing.pool.ThreadPool(processes=1) async_result = pool.apply_async(f, (self,) + args, kwargs) timeout = kwargs.pop('timeout_max_timeout', max_timeout) or max_timeout # raises a TimeoutError if execution exceeds max_timeout return async_result.get(timeout) return func_wrapper return timeout_decorator
def shutdown_listener(cls): if cls._queue: cls._queue.put(None) try: cls._queue.get(timeout=3) except TimeoutError: pass
def run(self, result): """ Distribute test cases across workers. Return an identifier of each test case with its result in order to use imap_unordered to show results as soon as they're available. To minimize pickling errors when getting results from workers: - pass back numeric indexes in self.subsuites instead of tests - make tracebacks picklable with tblib, if available Even with tblib, errors may still occur for dynamically created exception classes such Model.DoesNotExist which cannot be unpickled. """ if tblib is not None: tblib.pickling_support.install() counter = multiprocessing.Value(ctypes.c_int, 0) pool = multiprocessing.Pool( processes=self.processes, initializer=self.init_worker.__func__, initargs=[counter]) args = [ (index, subsuite, self.failfast) for index, subsuite in enumerate(self.subsuites) ] test_results = pool.imap_unordered(self.run_subsuite.__func__, args) while True: if result.shouldStop: pool.terminate() break try: subsuite_index, events = test_results.next(timeout=0.1) except multiprocessing.TimeoutError: continue except StopIteration: pool.close() break tests = list(self.subsuites[subsuite_index]) for event in events: event_name = event[0] handler = getattr(result, event_name, None) if handler is None: continue test = tests[event[1]] args = event[2:] handler(test, *args) pool.join() return result
def shard_download(self, pointer, shard_index, bucket_id, file_id): self.__logger.debug('Beginning download proccess...') try: self.__logger.debug('Starting download threads...') self.__logger.debug('Downloading shard at index %s ...', shard_index) url = 'http://{address}:{port}/shards/{hash}?token={token}'.format( address=pointer.get('farmer')['address'], port=str(pointer.get('farmer')['port']), hash=pointer['hash'], token=pointer['token']) self.__logger.debug(url) tp = ThreadPool(processes=1) async_result = tp.apply_async( self.retrieve_shard_file, (url, shard_index)) # tuple of args for foo shard = async_result.get(self.client.timeout) # get the return value # shard = self.retrieve_shard_file(url, shard_index) self.__logger.debug('Shard downloaded') self.__logger.debug('Shard at index %s downloaded successfully', shard_index) return shard except IOError as e: self.__logger.error('Perm error %s', e) if str(e) == str(13): self.__logger.error("""Error while saving or reading file or temporary file. Probably this is caused by insufficient permisions. Please check if you have permissions to write or read from selected directories.""") except TimeoutError: self.__logger.warning('Aborting shard %s download due to timeout' % shard_index) tp.terminate() self.__logger.warning('Try with a new pointer') new_pointer = self.client.file_pointers( bucket_id=bucket_id, file_id=file_id, limit='1', skip=str(shard_index), exclude=str([pointer['farmer']['nodeID']])) self.__logger.debug('Found new pointer') return self.shard_download(new_pointer[0], shard_index, bucket_id, file_id) except Exception as e: self.__logger.error(e) self.__logger.error('Unhandled')
def parse_async(self, structure_files, timeout, model=None, parser=RegularStructureParser): """ Call C{self.parse_structure} for a list of structure files simultaneously. The actual degree of parallelism will depend on the number of workers specified while constructing the parser object. @note: Don't be tempted to pass a large list of structures to this method. Every time a C{TimeoutError} is encountered, the corresponding worker process in the pool will hang until the process terminates on its own. During that time, this worker is unusable. If a sufficiently high number of timeouts occur, the whole pool of workers will be unsable. At the end of the method however a pool cleanup is performed and any unusable workers are 'reactivated'. However, that only happens at B{the end} of C{parse_async}. @param structure_files: a list of structure files @type structure_files: tuple of str @param timeout: raise multiprocessing.TimeoutError if C{timeout} seconds elapse before the parser completes its job @type timeout: int @param parser: any implementing L{AbstractStructureParser} class @type parser: type @return: a list of L{AsyncParseResult} objects @rtype: list """ pool = self._pool workers = [] results = [] for file in list(structure_files): result = pool.apply_async(_parse_async, [parser, file, model]) workers.append(result) hanging = False for w in workers: result = AsyncParseResult(None, None) try: result.result = w.get(timeout=timeout) except KeyboardInterrupt as ki: pool.terminate() raise ki except Exception as ex: result.exception = ex if isinstance(ex, multiprocessing.TimeoutError): hanging = True results.append(result) if hanging: self._recycle() return results
def modify_without_breaking(bytez, actions=[], seed=None): for action in actions: _action = ACTION_TABLE[action] # we run manipulation in a child process to shelter # our malware model from rare parsing errors in LIEF that # may segfault or timeout def helper(_action,shared_list): # TODO: LIEF is chatty. redirect stdout and stderr to /dev/null # for this process, change segfault of the child process # to a RuntimeEror def sig_handler(signum, frame): raise RuntimeError signal.signal(signal.SIGSEGV, sig_handler) bytez = array.array('B', shared_list[:]).tobytes() # TODO: LIEF is chatty. redirect output to /dev/null if type(_action) is str: _action = MalwareManipulator(bytez).__getattribute__(_action) else: _action = functools.partial( _action, bytez ) # redirect standard out only in this queue try: shared_list[:] = _action(seed) except (RuntimeError,UnicodeDecodeError,TypeError,lief.not_found) as e: # some exceptions that have yet to be handled by public release of LIEF print("==== exception in child process ===") print(e) # shared_bytez remains unchanged # communicate with the subprocess through a shared list # can't use multiprocessing.Array since the subprocess may need to # change the size manager = multiprocessing.Manager() shared_list = manager.list() shared_list[:] = bytez # copy bytez to shared array # define process p = multiprocessing.Process( target=helper, args=(_action,shared_list) ) p.start() # start the process try: p.join(5) # allow this to take up to 5 seconds... except multiprocessing.TimeoutError: # ..then become petulant print('==== timeouterror ') p.terminate() bytez = array.array('B', shared_list[:]).tobytes() # copy result from child process import hashlib m = hashlib.sha256() m.update( bytez ) print("new hash: {}".format(m.hexdigest())) return bytez
def check_requirements(self): ''' If PyQt4/PyQt5 is already imported, importing PyQt5/PyQt4 will fail so we need to test in a subprocess (as for Gtk3). ''' try: p = multiprocessing.Pool() except: # Can't do multiprocessing, fall back to normal approach ( this will fail if importing both PyQt4 and PyQt5 ) try: # Try in-process msg = self.callback(self) except RuntimeError: raise CheckFailed("Could not import: are PyQt4 & PyQt5 both installed?") except: # Raise any other exceptions raise else: # Multiprocessing OK try: res = p.map_async(self.callback, [self]) msg = res.get(timeout=10)[0] except multiprocessing.TimeoutError: p.terminate() # No result returned. Probaly hanging, terminate the process. raise CheckFailed("Check timed out") except: # Some other error. p.close() raise else: # Clean exit p.close() finally: # Tidy up multiprocessing p.join() return msg
def run(self, result): """ Distribute test cases across workers. Return an identifier of each test case with its result in order to use imap_unordered to show results as soon as they're available. To minimize pickling errors when getting results from workers: - pass back numeric indexes in self.subsuites instead of tests - make tracebacks picklable with tblib, if available Even with tblib, errors may still occur for dynamically created exception classes such Model.DoesNotExist which cannot be unpickled. """ counter = multiprocessing.Value(ctypes.c_int, 0) pool = multiprocessing.Pool( processes=self.processes, initializer=self.init_worker.__func__, initargs=[counter]) args = [ (self.runner_class, index, subsuite, self.failfast) for index, subsuite in enumerate(self.subsuites) ] test_results = pool.imap_unordered(self.run_subsuite.__func__, args) while True: if result.shouldStop: pool.terminate() break try: subsuite_index, events = test_results.next(timeout=0.1) except multiprocessing.TimeoutError: continue except StopIteration: pool.close() break tests = list(self.subsuites[subsuite_index]) for event in events: event_name = event[0] handler = getattr(result, event_name, None) if handler is None: continue test = tests[event[1]] args = event[2:] handler(test, *args) pool.join() return result
def _consume_record(self, record): """De-serialize the message and execute the incoming job. :param record: Record fetched from the Kafka topic. :type record: kafka.consumer.fetcher.ConsumerRecord """ rec = rec_repr(record) self._logger.info('Processing {} ...'.format(rec)) # noinspection PyBroadException try: job = dill.loads(record.value) except Exception: self._logger.warning('{} unloadable. Skipping ...'.format(rec)) else: # Simple check for job validity if not (isinstance(job, Job) and isinstance(job.args, collections.Iterable) and isinstance(job.kwargs, collections.Mapping) and callable(job.func)): self._logger.warning('{} malformed. Skipping ...'.format(rec)) return func, args, kwargs = job.func, job.args, job.kwargs self._logger.info('Running Job {}: {} ...'.format( job.id, func_repr(func, args, kwargs) )) try: timeout = self._timeout or job.timeout if timeout is None: res = func(*args, **kwargs) else: run = self._pool.apply_async(func, args, kwargs) res = run.get(timeout) except mp.TimeoutError: self._logger.error('Job {} timed out after {} seconds.' .format(job.id, job.timeout)) self._exec_callback('timeout', job, None, None, None) except Exception as e: self._logger.exception('Job {} failed: {}'.format(job.id, e)) self._exec_callback('failure', job, None, e, tb.format_exc()) else: self._logger.info('Job {} returned: {}'.format(job.id, res)) self._exec_callback('success', job, res, None, None)
def pidwrapper(num): print("Process {} starting".format(os.getpid())) result = dowork(num) print("Process {} ending".format(os.getpid())) return result if __name__ == "__main__": # Sequential list for generating fibbonacci sequence myList = range(30) # Generates a pool of 30 workers myPool = multiprocessing.Pool(processes=30) # sets up and automatically starts a worker for each number #output = pool.map(dowork, myList) # sets up an automatically starts a worker for each number, returning results # as they arrive results = [myPool.apply_async(pidwrapper, (num,)) for num in myList] # The get will raise an exception if the result is not ready. We can use # this to check it and move on if the result is not ready. done = False visited = [0 for x in myList] finalList = [0 for x in myList] start = time.time() while not done: try: for i in range(len(visited)): if not visited[i]: print("Fibonacci number: {}\n\tfinished in: {} seconds\n\tResult: {}".format(i, time.time()-start, results[i].get(timeout=1))) visited[i] = 1 finalList[i] = results[i].get() done = True except multiprocessing.TimeoutError: pass # The result is still being computed, move on to something else. print(finalList)
def get_cl_statuses(changes, fine_grained, max_processes=None): """Returns a blocking iterable of (cl, status) for given branches. If fine_grained is true, this will fetch CL statuses from the server. Otherwise, simply indicate if there's a matching url for the given branches. If max_processes is specified, it is used as the maximum number of processes to spawn to fetch CL status from the server. Otherwise 1 process per branch is spawned. See GetStatus() for a list of possible statuses. """ # Silence upload.py otherwise it becomes unwieldy. upload.verbosity = 0 if fine_grained: # Process one branch synchronously to work through authentication, then # spawn processes to process all the other branches in parallel. if changes: def fetch(cl): try: return (cl, cl.GetStatus()) except: # See http://crbug.com/629863. logging.exception('failed to fetch status for %s:', cl) raise yield fetch(changes[0]) changes_to_fetch = changes[1:] if not changes_to_fetch: # Exit early if there was only one branch to fetch. return pool = ThreadPool( min(max_processes, len(changes_to_fetch)) if max_processes is not None else max(len(changes_to_fetch), 1)) fetched_cls = set() it = pool.imap_unordered(fetch, changes_to_fetch).__iter__() while True: try: row = it.next(timeout=5) except multiprocessing.TimeoutError: break fetched_cls.add(row[0]) yield row # Add any branches that failed to fetch. for cl in set(changes_to_fetch) - fetched_cls: yield (cl, 'error') else: # Do not use GetApprovingReviewers(), since it requires an HTTP request. for cl in changes: yield (cl, 'waiting' if cl.GetIssueURL() else 'error')
def get_cl_statuses(changes, fine_grained, max_processes=None): """Returns a blocking iterable of (cl, status) for given branches. If fine_grained is true, this will fetch CL statuses from the server. Otherwise, simply indicate if there's a matching url for the given branches. If max_processes is specified, it is used as the maximum number of processes to spawn to fetch CL status from the server. Otherwise 1 process per branch is spawned. See GetStatus() for a list of possible statuses. """ # Silence upload.py otherwise it becomes unwieldy. upload.verbosity = 0 if not changes: raise StopIteration() if not fine_grained: # Fast path which doesn't involve querying codereview servers. # Do not use GetApprovingReviewers(), since it requires an HTTP request. for cl in changes: yield (cl, 'waiting' if cl.GetIssueURL() else 'error') return # First, sort out authentication issues. logging.debug('ensuring credentials exist') for cl in changes: cl.EnsureAuthenticated(force=False, refresh=True) def fetch(cl): try: return (cl, cl.GetStatus()) except: # See http://crbug.com/629863. logging.exception('failed to fetch status for %s:', cl) raise threads_count = len(changes) if max_processes: threads_count = max(1, min(threads_count, max_processes)) logging.debug('querying %d CLs using %d threads', len(changes), threads_count) pool = ThreadPool(threads_count) fetched_cls = set() try: it = pool.imap_unordered(fetch, changes).__iter__() while True: try: cl, status = it.next(timeout=5) except multiprocessing.TimeoutError: break fetched_cls.add(cl) yield cl, status finally: pool.close() # Add any branches that failed to fetch. for cl in set(changes) - fetched_cls: yield (cl, 'error')
def main(): try: checkpoint = json.load(open("checkpoint.factory.json")) except: checkpoint = {} starttime = time.time() pool = multiprocessing.Pool(processes=10) future = pool.apply_async(get_schedds) schedd_ads = future.get(TIMEOUT_MINS*60) print "There are %d schedds to query." % len(schedd_ads) futures = [] for schedd_ad in schedd_ads: name = schedd_ad["Name"] last_completion = checkpoint.get(name, 0) future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad)) futures.append((name, future)) pool.close() timed_out = False for name, future in futures: time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime) if time_remaining > 0: try: last_completion = future.get(time_remaining) if name: checkpoint[schedd_ad["name"]] = last_completion except multiprocessing.TimeoutError: print "Schedd %s timed out; ignoring progress." % name else: timed_out = True break if timed_out: pool.terminate() pool.join() try: checkpoint_new = json.load(open("checkpoint.factory.json")) except: checkpoint_new = {} for key, val in checkpoint.items(): if (key not in checkpoint_new) or (val > checkpoint_new[key]): checkpoint_new[key] = val fd, tmpname = tempfile.mkstemp(dir=".", prefix="checkpoint.factory.json.new") fd = os.fdopen(fd, "w") json.dump(checkpoint_new, fd) fd.close() os.rename(tmpname, "checkpoint.factory.json") print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
def main(): try: checkpoint = json.load(open("checkpoint2.json")) except: checkpoint = {} starttime = time.time() pool = multiprocessing.Pool(processes=10) future = pool.apply_async(get_schedds) schedd_ads = future.get(TIMEOUT_MINS*60) print "There are %d schedds to query." % len(schedd_ads) futures = [] for schedd_ad in schedd_ads: name = schedd_ad["Name"] last_completion = checkpoint.get(name, 0) future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad)) futures.append((name, future)) timed_out = False for name, future in futures: time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime) if time_remaining > 0: try: last_completion = future.get(time_remaining) checkpoint["name"] = last_completion except multiprocessing.TimeoutError: print "Schedd %s timed out; ignoring progress." % name else: timed_out = True break if timed_out: pool.terminate() else: pool.close() pool.join() fd = open("checkpoint2.json.new", "w") json.dump(checkpoint, fd) fd.close() os.rename("checkpoint2.json.new", "checkpoint2.json") print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
def main(): try: checkpoint = json.load(open("checkpoint.json")) except: checkpoint = {} starttime = time.time() pool = multiprocessing.Pool(processes=10) future = pool.apply_async(get_schedds) schedd_ads = future.get(TIMEOUT_MINS*60) print "There are %d schedds to query." % len(schedd_ads) futures = [] for schedd_ad in schedd_ads: name = schedd_ad["Name"] #if name != "vocms0309.cern.ch": continue last_completion = checkpoint.get(name, 0) future = pool.apply_async(process_schedd, (starttime, last_completion, schedd_ad)) futures.append((name, future)) #break pool.close() timed_out = False for name, future in futures: time_remaining = TIMEOUT_MINS*60+10 - (time.time() - starttime) if time_remaining > 0: try: last_completion = future.get(time_remaining) checkpoint[schedd_ad["name"]] = last_completion except multiprocessing.TimeoutError: print "Schedd %s timed out; ignoring progress." % name else: timed_out = True break if timed_out: pool.terminate() pool.join() try: checkpoint_new = json.load(open("checkpoint.json")) except: checkpoint_new = {} for key, val in checkpoint.items(): if (key not in checkpoint_new) or (val > checkpoint_new[key]): checkpoint_new[key] = val fd = open("checkpoint.json.new", "w") json.dump(checkpoint_new, fd) fd.close() os.rename("checkpoint.json.new", "checkpoint.json") print "Total processing time: %.2f mins" % ((time.time()-starttime)/60.)
def get_cl_statuses(changes, fine_grained, max_processes=None): """Returns a blocking iterable of (cl, status) for given branches. If fine_grained is true, this will fetch CL statuses from the server. Otherwise, simply indicate if there's a matching url for the given branches. If max_processes is specified, it is used as the maximum number of processes to spawn to fetch CL status from the server. Otherwise 1 process per branch is spawned. See GetStatus() for a list of possible statuses. """ # Silence upload.py otherwise it becomes unwieldy. upload.verbosity = 0 if not changes: raise StopIteration() if not fine_grained: # Fast path which doesn't involve querying codereview servers. # Do not use get_approving_reviewers(), since it requires an HTTP request. for cl in changes: yield (cl, 'waiting' if cl.GetIssueURL() else 'error') return # First, sort out authentication issues. logging.debug('ensuring credentials exist') for cl in changes: cl.EnsureAuthenticated(force=False, refresh=True) def fetch(cl): try: return (cl, cl.GetStatus()) except: # See http://crbug.com/629863. logging.exception('failed to fetch status for %s:', cl) raise threads_count = len(changes) if max_processes: threads_count = max(1, min(threads_count, max_processes)) logging.debug('querying %d CLs using %d threads', len(changes), threads_count) pool = ThreadPool(threads_count) fetched_cls = set() try: it = pool.imap_unordered(fetch, changes).__iter__() while True: try: cl, status = it.next(timeout=5) except multiprocessing.TimeoutError: break fetched_cls.add(cl) yield cl, status finally: pool.close() # Add any branches that failed to fetch. for cl in set(changes) - fetched_cls: yield (cl, 'error')
def map(self, func, iterable, chunksize=None, callback=None): """ Equivalent to the built-in ``map()`` function and :meth:`multiprocessing.pool.Pool.map()`, without catching ``KeyboardInterrupt``. Parameters ---------- worker : callable A function or callable object that is executed on each element of the specified ``tasks`` iterable. This object must be picklable (i.e. it can't be a function scoped within a function or a ``lambda`` function). This should accept a single positional argument and return a single object. tasks : iterable A list or iterable of tasks. Each task can be itself an iterable (e.g., tuple) of values or data to pass in to the worker function. callback : callable, optional An optional callback function (or callable) that is called with the result from each worker run and is executed on the master process. This is useful for, e.g., saving results to a file, since the callback is only called on the master thread. Returns ------- results : list A list of results from the output of each ``worker()`` call. """ if callback is None: callbackwrapper = None else: callbackwrapper = CallbackWrapper(callback) # The key magic is that we must call r.get() with a timeout, because # a Condition.wait() without a timeout swallows KeyboardInterrupts. r = self.map_async(func, iterable, chunksize=chunksize, callback=callbackwrapper) while True: try: return r.get(self.wait_timeout) except multiprocessing.TimeoutError: pass except KeyboardInterrupt: self.terminate() self.join() raise