我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.pool()。
def get_container_id_mapping(pool, compose_cmd): service_names = subprocess.check_output( compose_cmd + ["config", "--services"] ) service_names = service_names.strip().decode("utf-8").split("\n") id_mapping = { name: pool.apply_async(pool_container_id, (name, compose_cmd)) for name in service_names } while not all(future.ready() for future in id_mapping.values()): time.sleep(0.1) for name, future in list(id_mapping.items()): if not future.successful(): raise RuntimeError("Cannot get ID of service {0}".format(name)) id_mapping[name] = future.get() return id_mapping
def test_pool_worker_lifetime_early_close(self): # Issue #10332: closing a pool whose workers have limited lifetimes # before all the tasks completed would make join() hang. p = multiprocessing.Pool(3, maxtasksperchild=1) results = [] for i in range(6): results.append(p.apply_async(sqr, (i, 0.3))) p.close() p.join() # check the results for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) # # Test of creating a customized manager class #
def _repopulate_pool(self): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ for i in range(self._processes - len(self._pool)): # changed worker -> clean_worker args = (self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild) if hasattr(self, '_wrap_exception'): args += (self._wrap_exception,) w = self.Process(target=clean_worker, args=args) self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() util.debug('added worker')
def __init__(self, comm=None, loadbalance=False, debug=False, wait_on_start = True, exit_on_end = True, cores_per_task = 1, **kwargs): if MPI is None: raise ImportError("Please install mpi4py") self.comm = MPI.COMM_WORLD if comm is None else comm self.rank = self.comm.Get_rank() if cores_per_task > 1: self.size = max(1, self.comm.Get_size() // cores_per_task) else: self.size = self.comm.Get_size() - 1 self.function = _error_function self.loadbalance = loadbalance self.debug = debug if self.size == 0: raise ValueError("Tried to create an MPI pool, but there " "was only one MPI process available. " "Need at least two.") self.exit_on_end = exit_on_end # Enter main loop for workers? if wait_on_start: if self.is_worker(): self.wait()
def Pool(pool = 'AnyPool', **kwargs): ''' Chooses between the different pools. If ``pool == 'AnyPool'``, chooses based on availability. ''' if pool == 'MPIPool': return MPIPool(**kwargs) elif pool == 'MultiPool': return MultiPool(**kwargs) elif pool == 'SerialPool': return SerialPool(**kwargs) elif pool == 'AnyPool': if MPIPool.enabled(): return MPIPool(**kwargs) elif MultiPool.enabled(): return MultiPool(**kwargs) else: return SerialPool(**kwargs) else: raise ValueError('Invalid pool ``%s``.' % pool)
def load_embeddings_mp(path, word_dim, processes=None): if processes is None: processes = multiprocessing.cpu_count() pool = mp.Pool(processes, initializer=_mp_initialize, initargs=(word_dim,)) with open(path, "r") as f: iterator = chunks(f, n=processes, k=processes * 10000) ret = {} for batches in iterator: results = pool.map_async(_mp_process, batches) results = results.get() results = aggregate_dicts(*results) ret.update(results) return ret
def process_profilelog(fn, pout = None): # Either call with a list of filenames and set pout or a filename and optionally pout. if not pout: pout = fn + '.processed' pout = open(pout, 'w') import pstats if isinstance(fn, list): p = pstats.Stats(*fn, stream=pout) else: p = pstats.Stats(fn, stream=pout) p.sort_stats('time') p.print_stats() p.print_callers() p.sort_stats('cumulative') p.print_stats() pout.flush() pout.close() # # Was present to work around multiprocessing pool bugs in python < 2.7.3 #
def multiprocessingpool(*args, **kwargs): import multiprocessing.pool #import multiprocessing.util #multiprocessing.util.log_to_stderr(10) # Deal with a multiprocessing bug where signals to the processes would be delayed until the work # completes. Putting in a timeout means the signals (like SIGINT/SIGTERM) get processed. def wrapper(func): def wrap(self, timeout=None): return func(self, timeout=timeout if timeout is not None else 1e100) return wrap multiprocessing.pool.IMapIterator.next = wrapper(multiprocessing.pool.IMapIterator.next) return multiprocessing.Pool(*args, **kwargs)
def test_imap_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) for i in range(3): self.assertEqual(next(it), i*i) self.assertRaises(SayWhenError, it.next) # SayWhenError seen at start of problematic chunk's results it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) for i in range(6): self.assertEqual(next(it), i*i) self.assertRaises(SayWhenError, it.next) it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) for i in range(4): self.assertEqual(next(it), i*i) self.assertRaises(SayWhenError, it.next)
def test_imap_unordered_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) it = self.pool.imap_unordered(sqr, exception_throwing_generator(10, 3), 1) expected_values = map(sqr, range(10)) with self.assertRaises(SayWhenError): # imap_unordered makes it difficult to anticipate the SayWhenError for i in range(10): value = next(it) self.assertIn(value, expected_values) expected_values.remove(value) it = self.pool.imap_unordered(sqr, exception_throwing_generator(20, 7), 2) expected_values = map(sqr, range(20)) with self.assertRaises(SayWhenError): for i in range(20): value = next(it) self.assertIn(value, expected_values) expected_values.remove(value)
def test_pool_worker_lifetime_early_close(self): # Issue #10332: closing a pool whose workers have limited lifetimes # before all the tasks completed would make join() hang. p = multiprocessing.Pool(3, maxtasksperchild=1) results = [] for i in range(6): results.append(p.apply_async(sqr, (i, 0.3))) p.close() p.join() # check the results for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) # # Test that manager has expected number of shared objects left #
def test_imap_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) for i in range(3): self.assertEqual(next(it), i*i) self.assertRaises(SayWhenError, it.__next__) # SayWhenError seen at start of problematic chunk's results it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) for i in range(6): self.assertEqual(next(it), i*i) self.assertRaises(SayWhenError, it.__next__) it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) for i in range(4): self.assertEqual(next(it), i*i) self.assertRaises(SayWhenError, it.__next__)
def test_imap_unordered_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) it = self.pool.imap_unordered(sqr, exception_throwing_generator(10, 3), 1) expected_values = list(map(sqr, list(range(10)))) with self.assertRaises(SayWhenError): # imap_unordered makes it difficult to anticipate the SayWhenError for i in range(10): value = next(it) self.assertIn(value, expected_values) expected_values.remove(value) it = self.pool.imap_unordered(sqr, exception_throwing_generator(20, 7), 2) expected_values = list(map(sqr, list(range(20)))) with self.assertRaises(SayWhenError): for i in range(20): value = next(it) self.assertIn(value, expected_values) expected_values.remove(value)
def test_unpickleable_result(self): from multiprocessing.pool import MaybeEncodingError p = multiprocessing.Pool(2) # Make sure we don't lose pool processes because of encoding errors. for iteration in range(20): scratchpad = [None] def errback(exc): scratchpad[0] = exc res = p.apply_async(unpickleable_result, error_callback=errback) self.assertRaises(MaybeEncodingError, res.get) wrapped = scratchpad[0] self.assertTrue(wrapped) self.assertIsInstance(scratchpad[0], MaybeEncodingError) self.assertIsNotNone(wrapped.exc) self.assertIsNotNone(wrapped.value) p.close() p.join()
def as_bulk_resolve(candidates, threads=50): """ Resolve a list of IPs to AS information. Returns a map of each result as a tuple of (ASN, owner) keyed to its candidate. Returns None if no ASN could be found or (ASN, None) if an ASN was found but no owner is available. WARNING: This function will create a pool of up to 'threads' threads. """ result = {} pool = multiprocessing.pool.ThreadPool( processes=min(len(candidates), threads)) for ip, as_ in pool.imap( __asresolve__, candidates, chunksize=1): result[ip] = as_ pool.close() return result
def build_extensions(self): if build_concurrency > 1: self.check_extensions_list(self.extensions) import multiprocessing.pool multiprocessing.pool.ThreadPool(processes=build_concurrency).map(self.build_extension, self.extensions) else: build_ext.build_extensions(self)
def _setup_extensions(self): # We defer extension setup until this command to leveraage 'setup_requires' pulling in Cython before we # attempt to import anything self.extensions = [] if try_murmur3: self.extensions.append(murmur3_ext) if try_libev: self.extensions.append(libev_ext) if try_cython: try: from Cython.Build import cythonize cython_candidates = ['cluster', 'concurrent', 'connection', 'cqltypes', 'metadata', 'pool', 'protocol', 'query', 'util'] compile_args = [] if is_windows else ['-Wno-unused-function'] self.extensions.extend(cythonize( [Extension('cassandra.%s' % m, ['cassandra/%s.py' % m], extra_compile_args=compile_args) for m in cython_candidates], nthreads=build_concurrency, exclude_failures=True)) self.extensions.extend(cythonize(NoPatchExtension("*", ["cassandra/*.pyx"], extra_compile_args=compile_args), nthreads=build_concurrency)) except Exception: sys.stderr.write("Failed to cythonize one or more modules. These will not be compiled as extensions (optional).\n")
def fill_cache(self, repair_incorrect: bool = False) -> None: with Pool(processes=multiprocessing.cpu_count()) as pool: total = len(self.labeled_spectrograms) not_yet_cached = [s for s in self.labeled_spectrograms if not s.is_cached()] to_calculate = self.labeled_spectrograms if repair_incorrect else not_yet_cached log("Filling cache with {} spectrograms: {} already cached, {} to calculate.".format( total, total - len(not_yet_cached), len(to_calculate))) for index, labeled_spectrogram in enumerate(to_calculate): pool.apply_async(_repair_cached_spectrogram_if_incorrect if repair_incorrect else _cache_spectrogram, (labeled_spectrogram,)) pool.close() pool.join()
def clean_worker(*args, **kwargs): import gc multiprocessing.pool.worker(*args, **kwargs) # Regular multiprocessing workers don't fully clean up after themselves, # so we have to explicitly trigger garbage collection to make sure that all # destructors are called... gc.collect()
def __repr__(self): return "<Close pool message>"
def close(self): """ Just send a message off to all the pool members which contains the special :class:`_close_pool_message` sentinel. """ if self.is_master(): for i in range(self.size): self.comm.isend(_close_pool_message(), dest=i + 1)
def main(): process_pool_context = multiprocessing.get_context('spawn') pool = multiprocessing.pool.Pool( processes=2, context=process_pool_context, ) multiprocessing_manager = multiprocessing.Manager() multiprocessing_queue = multiprocessing_manager.Queue( maxsize=test_queue_size, ) start = time.time() for i in range(test_queue_size): multiprocessing_queue.put(b'1') end = time.time() print('queue INSERTION:') print(end-start) pool.apply(func=consume_queue, args=(multiprocessing_queue,), kwds={}) regular_queue = queue.Queue() start = time.time() for i in range(test_queue_size): regular_queue.put(b'1') end = time.time() print('queue INSERTION:') print(end-start) consume_queue(regular_queue)
def test_apply(self): papply = self.pool.apply self.assertEqual(papply(sqr, (5,)), sqr(5)) self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
def test_map(self): pmap = self.pool.map self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) self.assertEqual(pmap(sqr, range(100), chunksize=20), map(sqr, range(100)))
def test_map_unplicklable(self): # Issue #19425 -- failure to pickle should not cause a hang if self.TYPE == 'threads': self.skipTest('test not appropriate for {}'.format(self.TYPE)) class A(object): def __reduce__(self): raise RuntimeError('cannot pickle') with self.assertRaises(RuntimeError): self.pool.map(sqr, [A()]*10)
def test_map_chunksize(self): try: self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) except multiprocessing.TimeoutError: self.fail("pool.map_async with chunksize stalled on null list")
def test_async(self): res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) get = TimingWrapper(res.get) self.assertEqual(get(), 49) self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
def test_imap(self): it = self.pool.imap(sqr, range(10)) self.assertEqual(list(it), map(sqr, range(10))) it = self.pool.imap(sqr, range(10)) for i in range(10): self.assertEqual(it.next(), i*i) self.assertRaises(StopIteration, it.next) it = self.pool.imap(sqr, range(1000), chunksize=100) for i in range(1000): self.assertEqual(it.next(), i*i) self.assertRaises(StopIteration, it.next)
def test_imap_unordered(self): it = self.pool.imap_unordered(sqr, range(1000)) self.assertEqual(sorted(it), map(sqr, range(1000))) it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) self.assertEqual(sorted(it), map(sqr, range(1000)))
def test_unpickleable_result(self): from multiprocessing.pool import MaybeEncodingError p = multiprocessing.Pool(2) # Make sure we don't lose pool processes because of encoding errors. for iteration in range(20): res = p.apply_async(unpickleable_result) self.assertRaises(MaybeEncodingError, res.get) p.close() p.join()
def test_number_of_objects(self): EXPECTED_NUMBER = 1 # the pool object is still alive multiprocessing.active_children() # discard dead process objs gc.collect() # do garbage collection refs = self.manager._number_of_objects() debug_info = self.manager._debug_info() if refs != EXPECTED_NUMBER: print self.manager._debug_info() print debug_info self.assertEqual(refs, EXPECTED_NUMBER) # # Test of creating a customized manager class #
def test_import(self): modules = [ 'multiprocessing', 'multiprocessing.connection', 'multiprocessing.heap', 'multiprocessing.managers', 'multiprocessing.pool', 'multiprocessing.process', 'multiprocessing.synchronize', 'multiprocessing.util' ] if HAS_REDUCTION: modules.append('multiprocessing.reduction') if c_int is not None: # This module requires _ctypes modules.append('multiprocessing.sharedctypes') for name in modules: __import__(name) mod = sys.modules[name] for attr in getattr(mod, '__all__', ()): self.assertTrue( hasattr(mod, attr), '%r does not have attribute %r' % (mod, attr) ) # # Quick test that logging works -- does not test logging output #
def pool_in_process(): pool = multiprocessing.Pool(processes=4) x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])