我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用multiprocessing.dummy()。
def concurrent_get_release_status(targets, timeout=4): """ Args: target (list of tuples): a list of (host, target_path) """ if len(targets) == 0: return [] # workaround for http://bugs.python.org/issue7980 import _strptime # noqa pool = multiprocessing.dummy.Pool(min(20, len(targets))) def _inner_get_release_status(target): host, path = target return get_release_status(host, path, timeout) try: return pool.map(_inner_get_release_status, targets, chunksize=1) finally: pool.close()
def dns_bulk_resolve(candidates, reverse=False, ip_version=None, threads=50): """ Resolve a list of host names to IPs or, if reverse is true, IPs to host names. Return a map of each result keyed to its candidate. WARNING: This function will create a pool of up to 'threads' threads. """ # This is based loosely on http://stackoverflow.com/a/34377198 if reverse and ip_version is not None: raise ValueError("Unable to force IP version when reverse-resolving") if ip_version is None: ip_version = 4 __check_ip_version__(ip_version) result = {} if len(candidates) == 0: return result # Work around a bug in 2.6 # TODO: Get rid of this when 2.6 is no longer in the picture. if not hasattr(threading.current_thread(), "_children"): threading.current_thread()._children = weakref.WeakKeyDictionary() pool = multiprocessing.dummy.Pool( processes=min(len(candidates), threads) ) candidate_args = [ (candidate, ip_version) for candidate in candidates ] for ip, name in pool.imap( __reverser__ if reverse else __forwarder__, candidate_args, chunksize=1): result[ip] = name pool.close() return result
def api_ping_list(hosts, bind=None, timeout=None, threads=10): """ Ping a list of hosts and return a list of their statuses. """ if len(hosts) == 0: return {} # Work around a bug in 2.6 # TODO: Get rid of this when 2.6 is no longer in the picture. if not hasattr(threading.current_thread(), "_children"): threading.current_thread()._children = weakref.WeakKeyDictionary() pool = multiprocessing.dummy.Pool(processes=min(len(hosts), threads)) pool_args = [(host, timeout) for host in hosts] result = {} def ping_one(arg): host, timeout = arg up, _ = api_ping(host, bind=bind, timeout=timeout) return (host, up) for host, state in pool.imap( ping_one, pool_args, chunksize=1): result[host] = state pool.close() return result
def api_has_services(hosts, timeout=5, bind=None, threads=10): """ Do a parallel rendition of the two functions above. Returns a hash of host names and results """ # Work around a bug in 2.6 # TODO: Get rid of this when 2.6 is no longer in the picture. if not hasattr(threading.current_thread(), "_children"): threading.current_thread()._children = weakref.WeakKeyDictionary() pool = multiprocessing.dummy.Pool(processes=min(len(hosts), threads)) def check_one(arg): host, service, function = arg return (host, service, function(host, timeout=timeout, bind=bind)) args = [] result = {} for host in hosts: args.extend([ (host, "bwctl", api_has_bwctl), (host, "pscheduler", api_has_pscheduler) ]) result[host] = { "bwctl": None, "pscheduler": None } for host, service, state in pool.imap(check_one, args, chunksize=1): result[host][service] = state pool.close() return result
def _all(func, hosts): ''' Internal function that allow function to perform in all hosts ''' all_instances = [] # threads should likely scale with cores or interfaces cpus = multiprocessing.cpu_count() threads = 4 * cpus log.debug('multi._all cpus count={}, thread count={}'.format(cpus, threads)) pool = multiprocessing.dummy.Pool(threads) for instance in pool.map(func, hosts): all_instances.append(instance) return all_instances
def __init__(self, dataset, feedin_shape, collate_fn=default_collate, threads=1, shuffle=False): super(DataLoader, self).__init__() self.dataset = dataset self.threads = threads self.collate_fn = collate_fn(feedin_shape) # self.collate_fn = self.default_collate_fn # shape related variables self.data_shapes = feedin_shape['data'] self.label_shapes = feedin_shape['label'] self.batch_size = feedin_shape['batch_size'] # loader related variables self.current = 0 self.total = len(self.dataset) self.shuflle = shuffle self.map_index = list(range(self.total)) # prepare for loading self.get_batch = self.get_batch_single_thread if self.threads > 1: # multi process read from multiprocessing.dummy import Pool as ThreadPool # self.pool = multiprocessing.Pool(self.threads) self.pool = ThreadPool(self.threads) self.get_batch = self.get_batch_multi_thread self.reset()
def test_main(run=None): if sys.platform.startswith("linux"): try: lock = multiprocessing.RLock() except OSError: raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") check_enough_semaphores() if run is None: from test.test_support import run_unittest as run util.get_temp_dir() # creates temp directory for use by all processes multiprocessing.get_logger().setLevel(LOG_LEVEL) ProcessesMixin.pool = multiprocessing.Pool(4) ThreadsMixin.pool = multiprocessing.dummy.Pool(4) ManagerMixin.manager.__init__() ManagerMixin.manager.start() ManagerMixin.pool = ManagerMixin.manager.Pool(4) testcases = ( sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + testcases_other ) loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) # (ncoghlan): Whether or not sys.exc_clear is executed by the threading # module during these tests is at least platform dependent and possibly # non-deterministic on any given platform. So we don't mind if the listed # warnings aren't actually raised. with test_support.check_py3k_warnings( (".+__(get|set)slice__ has been removed", DeprecationWarning), (r"sys.exc_clear\(\) not supported", DeprecationWarning), quiet=True): run(suite) ThreadsMixin.pool.terminate() ProcessesMixin.pool.terminate() ManagerMixin.pool.terminate() ManagerMixin.manager.shutdown() del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool