我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gevent.pool.Pool()。
def run(options): import gluon.main if options.password != '<recycle>': gluon.main.save_password(options.password, int(options.port)) if options.logging: application = gluon.main.appfactory(wsgiapp=gluon.main.wsgibase, logfilename='httpserver.log', profiler_dir=profiler) else: application = gluon.main.wsgibase address = (options.ip, int(options.port)) workers = options.workers spawn = workers and Pool(int(options.workers)) or 'default' ssl_args = dict() if options.ssl_private_key: ssl_args['keyfile'] = options.ssl_private_key if options.ssl_certificate: ssl_args['certfile'] = options.ssl_certificate server = pywsgi.WSGIServer( address, application, spawn=spawn, log=None, **ssl_args ) server.serve_forever()
def _concurrent_execute(self, context, start_req, parser, pool, pool_size): queue = Queue() # ???? # ???????????? for r in start_req: queue.put_nowait(r) if pool is None: pool = GeventPool(pool_size) greenlets = [] while True: try: req = self._check_req(queue.get(timeout=1)) if req.parser is None: req.parser = parser greenlets.append(pool.spawn(req, context, queue)) except Empty: break return [greenlet.get() for greenlet in greenlets]
def __init__(self, config): self.config = config.with_options(self) self.crashstorage = self.config('crashstorage_class')(config.with_namespace('crashstorage')) self.throttler = Throttler(config) # Gevent pool for crashmover workers self.crashmover_pool = Pool(size=self.config('concurrent_crashmovers')) # Queue for crashmover of crashes to save self.crashmover_save_queue = deque() # Register hb functions with heartbeat manager register_for_heartbeat(self.hb_report_health_stats) register_for_heartbeat(self.hb_run_crashmover) # Register life function with heartbeat manager register_for_life(self.has_work_to_do)
def test_pool(self): """?????""" class SocketPool(object): def __init__(self): self.pool = Pool(1000) self.pool.start() def listen(self, socket): while True: socket.recv() def add_handler(self, socket): if self.pool.full(): raise Exception("At maximum pool size") else: self.pool.spawn(self.listen, socket) def shutdown(self): self.pool.kill()
def main(): # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, range(10))] run2 = [a for a in p.imap_unordered(echo, range(10))] run3 = [a for a in p.imap_unordered(echo, range(10))] run4 = [a for a in p.imap_unordered(echo, range(10))] print( run1 == run2 == run3 == run4 ) # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, range(10))] run2 = [a for a in p.imap_unordered(echo, range(10))] run3 = [a for a in p.imap_unordered(echo, range(10))] run4 = [a for a in p.imap_unordered(echo, range(10))] run1[0] print( run1 == run2 == run3 == run4 )
def _pull_from(self, subscription): user = self.site.get_user(subscription['username']) self.image_cache_handler.get_or_create(username=user.username) new_images = self.image_cache_handler.get_the_news(user.images) # This need run after send all images, because bulk is raising an # InvalidOperation Exception: Bulk operations can only be executed once self.image_cache_handler.add_the_images(new_images) chat_ids = [s['chat_id'] for s in subscription['subscribers']] p = pool.Pool(5) for _id in chat_ids: p.spawn(self._push_to, _id, new_images) p.join()
def set_spawn(self, spawn): if spawn == 'default': self.pool = None self._spawn = self._spawn elif hasattr(spawn, 'spawn'): self.pool = spawn self._spawn = spawn.spawn elif isinstance(spawn, integer_types): from gevent.pool import Pool self.pool = Pool(spawn) self._spawn = self.pool.spawn else: self.pool = None self._spawn = spawn if hasattr(self.pool, 'full'): self.full = self.pool.full if self.pool is not None: self.pool._semaphore.rawlink(self._start_accepting_if_started)
def __init__(self, database, ignore_rfc1918=True): self.normalizers = {} #injected instance of database.Database self.database = database self.enabled = True self.ignore_rfc1918 = ignore_rfc1918 #max number of concurrent mongodb inserters self.worker_pool = Pool(5) #map normalizers for n in basenormalizer.BaseNormalizer.__subclasses__(): normalizer = n() for channel in normalizer.channels: if channel in self.normalizers: raise Exception('Only one normalizer for each channel allowed (%s).'.format(channel)) else: self.normalizers[channel] = normalizer
def map(requests, prefetch=True, size=None): """Concurrently converts a list of Requests to Responses. :param requests: a collection of Request objects. :param prefetch: If False, the content will not be downloaded immediately. :param size: Specifies the number of requests to make at a time. If None, no throttling occurs. """ requests = list(requests) pool = Pool(size) if size else None jobs = [send(r, pool) for r in requests] gevent.joinall(jobs) if prefetch: [r.response.content for r in requests] return [r.response for r in requests]
def __init__(self, spider): self.spider = spider self.logger = spider.logger self.scheduler = Scheduler(spider) self.settings = spider.settings max_request_size = self.settings["MAX_REQUEST_SIZE"] self.dynamic = self.settings["DYNAMIC_CRAWL"] if self.dynamic: module_path = DRIVER_MODULE module = import_module(module_path) init_kwargs = self.settings['DRIVER_INIT_KWARGS'] self.driver = getattr(module, self.settings.get('DRIVER').title())(**init_kwargs) else: self.driver = None self.driver_sem = BoundedSemaphore(1) self.downloader = Downloader(spider, self.driver, self.driver_sem) self.pool = Pool(size=max_request_size)
def load(self, table): cache = {} def save(row): (query, values) = self.get_insert(row, table) try: prepared = cache[query] except: prepared = self.session.prepare(query) cache[query] = prepared bound = prepared.bind(values) self.session.execute(bound) pool = Pool(100) i = 0 print "Loading {}".format(table) with ProgressBar(max_value=len(self.dataframe)) as p: for _ in pool.imap_unordered(save, self.iter()): i += 1 if i % 10 == 0: p.update(i)
def _load_dns_servers(self): print '[+] Validate DNS servers ...' self.dns_servers = [] pool = Pool(30) for server in open('dict/dns_servers.txt').xreadlines(): server = server.strip() if server: pool.apply_async(self._test_server, (server,)) pool.join() self.dns_count = len(self.dns_servers) sys.stdout.write('\n') print '[+] Found %s available DNS Servers in total' % self.dns_count if self.dns_count == 0: print '[ERROR] No DNS Servers available.' sys.exit(-1)
def test_catch_all_gevented_requests(vts_rec_on, movie_server): """Keep this test at the very end to avoid messing up with the rest of the tests, since it's monkey patching the network related operations. Maybe write a custom pytest order enforcer later.""" def _job(): return http_get(movie_server.url) from gevent.pool import Pool import gevent.monkey gevent.monkey.patch_socket(dns=True) pool = Pool() for x in range(10): pool.spawn(_job) pool.join() assert len(vts_rec_on.cassette) == 10
def __init__(self, name=None, service_group_conf=None, app=None, channel="center", lock=False, auri=None): # ??name????service_group_conf??service_group?? if not name: if service_group_conf: name = get_service_group(service_group_conf).get('service_group') if not name: raise EnvironmentError('Neither name given nor service_group_conf name given') super(WORK_FRAME, self).__init__(name, app=app, channel=channel, auri=auri, lock=lock) self.command_q = "{0}-{1}".format(self.name, self.id) # frame????,????consumer??20s?????command_q self.create_queue(self.command_q, ttl=15, args={'x-expires': 20000}) self.command_prefix = "skyeye-rpc-{0}.".format(self.name) self.join(self.command_q, "{0}*".format(self.command_prefix)) self.init_command() self.command_pool = Pool(100) self.service_group_conf = service_group_conf
def send_multi_concurrency(self, messages): greenlets = [] _pool = pool.Pool(self.concurrency) for message in messages: if self.sleep_interval > 0: time.sleep(self.sleep_interval) greenlets.append(_pool.spawn(self.send, message)) _pool.join() results = [] for g in greenlets: results.append(g.value) return results
def __init__(self): self.pool_size = None self.pool = Pool(self.pool_size) self.session = requests.Session() self.timeout = 10 self.url = None self.response = None
def run(app): linkero.printWellcome() linkero.createDB() if linkero.config["SSL"]["activate"]: gevent_server = WSGIServer((linkero.config["host"]["ip"], int(os.environ.get('PORT', linkero.config["host"]["port"]))), app, spawn=Pool(linkero.config["gevent"]["spawn"]), log='default' if (linkero.config["gevent"]["accessLog"] == True) else None, keyfile=linkero.config["SSL"]["key"], certfile=linkero.config["SSL"]["certificate"]) else: gevent_server = WSGIServer((linkero.config["host"]["ip"], int(os.environ.get('PORT', linkero.config["host"]["port"]))), app, spawn=Pool(linkero.config["gevent"]["spawn"], log = 'default' if (linkero.config["gevent"]["accessLog"] == True) else None)) gevent_server.serve_forever()
def gevent(app, address, **options): options = options['options'] workers = options.workers from gevent import pywsgi from gevent.pool import Pool pywsgi.WSGIServer(address, app, spawn=workers and Pool( int(options.workers)) or 'default', log=None).serve_forever()
def __init__(self): self.pool = Pool(1000) self.pool.start()
def __init__(self, func, pool_size=100, timeout=None): # XXX: Is it necessary to patch all? I know we need at least, socket, # ssl, dns, and signal. There may be calls inside boto/botocore that # require more patching. super(GeventWorker, self).__init__(func, pool_size=pool_size, timeout=timeout) self.logger = get_logger(__name__) self.pool = Pool(size=pool_size)
def run(self, seedfile, progress_queue, output_queue): task_total = count_file_linenum(seedfile) proc_name = current_process().name sys.stdout = ProcessIO(output_queue) def progress_tracking(greenlet): count = getattr(progress_tracking, 'count', 0) + 1 setattr(progress_tracking, 'count', count) progress_queue.put((proc_name, count, task_total)) return greenlet po = pool.Pool(self.pool_size) with open(seedfile) as f: for line in f: g = po.apply_async(func=self.pool_task_with_timeout, args=(line, ), kwds=None, callback=self.callback) g.link(progress_tracking) po.add(g) try: po.join() except (KeyboardInterrupt, SystemExit) as ex: print(str(ex)) po.kill()
def __init__(self, listener, locals=None, banner=None, **server_args): """ :keyword locals: If given, a dictionary of "builtin" values that will be available at the top-level. :keyword banner: If geven, a string that will be printed to each connecting user. """ group = Pool(greenlet_class=_Greenlet_stdreplace) # no limit on number StreamServer.__init__(self, listener, spawn=group, **server_args) _locals = {'__doc__': None, '__name__': '__console__'} if locals: _locals.update(locals) self.locals = _locals self.banner = banner self.stderr = sys.stderr
def use_gevent_with_queue(): queue = Queue() pool = Pool(5) for p in range(1, 7): queue.put(p) while pool.free_count(): sleep(0.1) pool.spawn(save_search_result_with_queue, queue) pool.join()
def use_gevent_with_queue(): queue = Queue() pool = Pool(5) for p in range(1, 7): put_new_page(p, queue) while pool.free_count(): sleep(0.1) pool.spawn(save_search_result_with_queue, queue) pool.join()
def serve_forever(listener): WSGIServer(listener, application, spawn=Pool(), log=None).serve_forever()
def __init__(self, queue, db_proxy_num,myip): self.crawl_pool = Pool(THREADNUM) self.queue = queue self.db_proxy_num = db_proxy_num self.myip = myip
def __init__(self,options): self.options = options self.blockip = None self.keywords = False self.pool = Pool(self.options['threads_count']) self.document = self.options['target'].replace('.','_')+'.txt' socket.setdefaulttimeout(self.options['timeout']) # ????
def bulk_originate(self, request_uuid_list): if request_uuid_list: self.log.info("BulkCall for RequestUUIDs %s" % str(request_uuid_list)) job_pool = pool.Pool(len(request_uuid_list)) [ job_pool.spawn(self.spawn_originate, request_uuid) for request_uuid in request_uuid_list ] return True self.log.error("BulkCall Failed -- No RequestUUID !") return False
def __init__(self, worker_func, task_count, worker_count=-1): # initialize completion task worker pool # if number of workers is not specified, set it to the number of CPUs if worker_count == -1: worker_count = cpu_count() self.worker_pool = pool.Pool(size=worker_count) self.worker_pool_closed = False # store requested task count and callback function self.task_count = task_count self.worker_func = worker_func
def test_concurrency_with_delayed_url(self): dh = DownloadHandler(self.spider, self.driver, self.driver_sem) n = 5 pool = Pool(n) urls = [] for i in range(n): urls.append(HTTPBIN_URL + '/delay/1') time_start = time.time() pool.map(dh.fetch, [Request(url) for url in urls]) time_total = time.time() - time_start self.assertLess(time_total, n)
def test_dynamic_request_concurrency(self): self.driver = webdriver.PhantomJS() dh = DownloadHandler(self.spider, self.driver, self.driver_sem) n = 5 pool = Pool(n) urls = [] for i in range(n): urls.append(HTTPBIN_URL + '/delay/1') time1 = time.time() pool.map(dh.fetch, [Request(url, dynamic=True, wait=5) for url in urls]) self.assertGreater(time.time() - time1, n) self.driver.close()
def run(self, proxylist): if len(proxylist) == 0: return [] pool = Pool(VALIDATE_CONFIG['THREAD_NUM']) self.result = filter(lambda x: x, pool.map(self.valid, proxylist)) return self.result
def run_task_in_gevent(url_list, poc_file_dict): # url_list ???????????url poc = Poc_Launcher() pool = Pool(100) for target in url_list: for plugin_type, poc_files in poc_file_dict.iteritems(): for poc_file in poc_files: if target and poc_file: target = fix_target(target) pool.add(gevent.spawn(poc.poc_verify, target, plugin_type, poc_file)) pool.join()
def new_parallel(self, function, *params): ''' Register a new thread executing a parallel method. ''' # Create a pool if not created (processes or Gevent...) if self.ppool is None: if core_type == 'thread': from multiprocessing.pool import ThreadPool self.ppool = ThreadPool(500) else: from gevent.pool import Pool self.ppool = Pool(500) # Add the new task to the pool self.ppool.apply_async(function, *params)
def run_bugscan(url_list): from tools.pocs.bugscan import Bugscan PLUGINS_DIR = 'D:\\Projects\\xlcscan\\tools\\pocs\\' poc = Bugscan() pool = Pool(100) for target in url_list: for poc_file in bugscan_name_list: if target and poc_file: target = fix_target(target) poc_file = PLUGINS_DIR + 'bugscan' + '\\' + poc_file pool.add(gevent.spawn(poc.run, target,poc_file)) pool.join()