我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.PULL。
def __init__(self, opts=None): if opts is None: self.opts = self.process_config(CONFIG_LOCATION) else: self.opts = opts self.ctx = zmq.Context() self.pub_socket = self.ctx.socket(zmq.PUB) self.pub_socket.bind('tcp://127.0.0.1:2000') self.loop = zmq.eventloop.IOLoop.instance() self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop) # Now create PULL socket over IPC to listen to reactor self.pull_socket = self.ctx.socket(zmq.PULL) self.pull_socket.bind('ipc:///tmp/reactor.ipc') self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop) self.pull_stream.on_recv(self.republish)
def receive_message(self, event, event_data, listener_data): """ Receives a messages from another processes. :param * event: Not used. :param * event_data: Not used. :param * listener_data: Not used. """ del event, event_data, listener_data # Make a poller for all incoming sockets. poller = zmq.Poller() for socket in self.__end_points.values(): if socket.type in [zmq.PULL, zmq.REP]: poller.register(socket, zmq.POLLIN) # Wait for socket is ready for reading. socks = dict(poller.poll()) for name, socket in self.__end_points.items(): if socket in socks: self._receive_message(name, socket) # ------------------------------------------------------------------------------------------------------------------
def __register_sockets(self): """ Registers ZMQ sockets for communication with other processes in Enarksh. """ config = Config.get() # Register socket for receiving asynchronous incoming messages. self.__message_controller.register_end_point('pull', zmq.PULL, config.get_spawner_pull_end_point()) # Register socket for sending asynchronous messages to the controller. self.__message_controller.register_end_point('controller', zmq.PUSH, config.get_controller_pull_end_point()) # Register socket for sending asynchronous messages to the logger. self.__message_controller.register_end_point('logger', zmq.PUSH, config.get_logger_pull_end_point()) # ------------------------------------------------------------------------------------------------------------------
def __register_sockets(self): """ Registers ZMQ sockets for communication with other processes in Enarksh. """ config = Config.get() # Register socket for receiving asynchronous incoming messages. self.message_controller.register_end_point('pull', zmq.PULL, config.get_controller_pull_end_point()) # Create socket for lockstep incoming messages. self.message_controller.register_end_point('lockstep', zmq.REP, config.get_controller_lockstep_end_point()) # Create socket for sending asynchronous messages to the spanner. self.message_controller.register_end_point('spawner', zmq.PUSH, config.get_spawner_pull_end_point()) # Create socket for sending asynchronous messages to the logger. self.message_controller.register_end_point('logger', zmq.PUSH, config.get_logger_pull_end_point()) # ------------------------------------------------------------------------------------------------------------------
def register_end_point(self, name, socket_type, end_point): """ Registers an end point. :param str name: The name of the end point. :param int socket_type: The socket type, one of - zmq.PULL for asynchronous incoming messages - zmq.REP for lockstep incoming messages - zmq.PUSH for asynchronous outgoing messages :param str end_point: The end point. """ socket = self.__zmq_context.socket(socket_type) self.__end_points[name] = socket if socket_type in [zmq.PULL, zmq.REP]: socket.bind(end_point) elif socket_type == zmq.PUSH: socket.connect(end_point) else: raise ValueError("Unknown socket type {0}".format(socket_type)) # ------------------------------------------------------------------------------------------------------------------
def test_shadow_pyczmq(self): try: from pyczmq import zctx, zsocket except Exception: raise SkipTest("Requires pyczmq") ctx = zctx.new() ca = zsocket.new(ctx, zmq.PUSH) cb = zsocket.new(ctx, zmq.PULL) a = zmq.Socket.shadow(ca) b = zmq.Socket.shadow(cb) a.bind("inproc://a") b.connect("inproc://a") a.send(b'hi') rcvd = self.recv(b) self.assertEqual(rcvd, b'hi') # Travis can't handle how much memory PyPy uses on this test
def test_cyclic_destroy(self): """ctx.destroy should succeed when cyclic ref prevents gc""" # test credit @dln (GH #137): class CyclicReference(object): def __init__(self, parent=None): self.parent = parent def crash(self, sock): self.sock = sock self.child = CyclicReference(self) def crash_zmq(): ctx = self.Context() sock = ctx.socket(zmq.PULL) c = CyclicReference() c.crash(sock) ctx.destroy() crash_zmq()
def test_poll(self): @gen.coroutine def test(): a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL) f = b.poll(timeout=0) self.assertEqual(f.result(), 0) f = b.poll(timeout=1) assert not f.done() evt = yield f self.assertEqual(evt, 0) f = b.poll(timeout=1000) assert not f.done() yield a.send_multipart([b'hi', b'there']) evt = yield f self.assertEqual(evt, zmq.POLLIN) recvd = yield b.recv_multipart() self.assertEqual(recvd, [b'hi', b'there']) self.loop.run_sync(test)
def test_poll(self): @asyncio.coroutine def test(): a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL) f = b.poll(timeout=0) yield from asyncio.sleep(0) self.assertEqual(f.result(), 0) f = b.poll(timeout=1) assert not f.done() evt = yield from f self.assertEqual(evt, 0) f = b.poll(timeout=1000) assert not f.done() yield from a.send_multipart([b'hi', b'there']) evt = yield from f self.assertEqual(evt, zmq.POLLIN) recvd = yield from b.recv_multipart() self.assertEqual(recvd, [b'hi', b'there']) self.loop.run_until_complete(test())
def _process_single_event(self, socket): """ Process a socket's event. Parameters ---------- socket : zmq.Socket Socket that generated the event. """ data = socket.recv() address = self.address[socket] if address.kind == 'SUB': self._process_sub_event(socket, address, data) elif address.kind == 'PULL': self._process_pull_event(socket, address, data) elif address.kind == 'REP': self._process_rep_event(socket, address, data) else: self._process_single_event_complex(address, socket, data)
def _process_pull_event(self, socket, addr, data): """ Process a PULL socket's event. Parameters ---------- socket : zmq.Socket Socket that generated the event. addr : AgentAddress AgentAddress associated with the socket that generated the event. data : bytes Data received on the socket. """ message = deserialize_message(message=data, serializer=addr.serializer) handler = self.handler[socket] if not isinstance(handler, (list, dict, tuple)): handler = [handler] for h in handler: h(self, message)
def test_agentchannel_async_rep(): """ Test basic ASYNC_REP AgentChannel operations: initialization, equivalence and basic methods. """ receiver = AgentAddress('ipc', 'addr0', 'PULL', 'server', 'pickle') channel = AgentChannel('ASYNC_REP', receiver=receiver, sender=None) # Equivalence assert channel == AgentChannel('ASYNC_REP', receiver=receiver, sender=None) assert not channel == 'foo' assert channel != 'foo' # Basic methods assert channel.twin() == AgentChannel('ASYNC_REQ', sender=receiver.twin(), receiver=None) # Other attributes assert hasattr(channel, 'uuid') assert channel.transport == 'ipc' assert channel.serializer == 'pickle'
def test_agentchannel_sync_pub(): """ Test basic SYNC_PUB AgentChannel operations: initialization, equivalence and basic methods. """ sender = AgentAddress('ipc', 'addr0', 'PUB', 'server', 'pickle') receiver = AgentAddress('ipc', 'addr0', 'PULL', 'server', 'pickle') channel = AgentChannel('SYNC_PUB', sender=sender, receiver=receiver) # Equivalence assert channel == AgentChannel('SYNC_PUB', sender=sender, receiver=receiver) assert not channel == 'foo' assert channel != 'foo' # Basic methods assert channel.twin() == AgentChannel('SYNC_SUB', sender=receiver.twin(), receiver=sender.twin()) # Other attributes assert hasattr(channel, 'uuid') assert channel.transport == 'ipc' assert channel.serializer == 'pickle'
def connect(self): self.context = zmq.Context() if not self.context: raise RuntimeError('Failed to create ZMQ context!') self.socket = self.context.socket(zmq.PULL) if not self.socket: raise RuntimeError('Failed to create ZMQ socket!') self.socket.bind(self.endpoint) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.is_connected = True
def router_main(_, pidx, args): log = get_logger('examples.zmqserver.extra', pidx) ctx = zmq.Context() ctx.linger = 0 in_sock = ctx.socket(zmq.PULL) in_sock.bind('tcp://*:5000') out_sock = ctx.socket(zmq.PUSH) out_sock.bind('ipc://example-events') try: log.info('router proxy started') zmq.proxy(in_sock, out_sock) except KeyboardInterrupt: pass except: log.exception('unexpected error') finally: log.info('router proxy terminated') in_sock.close() out_sock.close() ctx.term()
def worker_main(loop, pidx, args): log = get_logger('examples.zmqserver.worker', pidx) router = await aiozmq.create_zmq_stream( zmq.PULL, connect='ipc://example-events') async def process_incoming(router): while True: try: data = await router.read() except aiozmq.ZmqStreamClosed: break log.info(data) task = loop.create_task(process_incoming(router)) log.info('started') yield router.close() await task log.info('terminated')
def reset(self): self.status = READY context = zmq.Context() self._socket1 = context.socket(zmq.PUSH) self._socket1.bind(self._address1) self._socket1.set_hwm(32) self._socket2 = context.socket(zmq.PULL) self._socket2.set_hwm(32) self._socket2.RCVTIMEO = 1 self._socket2.bind(self._address2) self._prev_drained = False self._sub_drained = False self._conn1_send_count = 0 self._conn1_recv_count = {} self._conn2_send_count = {} self._conn2_recv_count = 0 self._retry_count = 0
def reset(self): self.status = READY context = zmq.Context() self._socket = context.socket(zmq.PULL) self._socket.RCVTIMEO = 1 sync_socket = context.socket(zmq.PUSH) while self._ports['conn1'] is None or self._ports['sync_conn1'] is None: sleep(0.01) # Handshake with main process self._socket.connect(self._address + ':' + str(self._ports['conn1'])) sync_socket.connect(self._address + ':' + str(self._ports['sync_conn1'])) packet = msgpack.dumps(b'SYNC') sync_socket.send(packet) sync_socket.close() self._num_recv = 0 self._drained = False
def zmq_streamer(): try: context = zmq.Context() # Socket facing clients frontend = context.socket(zmq.PUSH) frontend.bind("tcp://*:%s" % (zmq_queue_port_push)) # Socket facing services backend = context.socket(zmq.PULL) backend.bind("tcp://*:%s" % (zmq_queue_port_pull)) zmq.device(zmq.STREAMER, frontend, backend) except Exception as e: print(e) print("bringing down zmq device") finally: frontend.close() backend.close() context.term()
def generator_from_zmq_pull(context, host): socket = context.socket(zmq.PULL) # TODO: Configure socket with clean properties to avoid message overload. if host.endswith('/'): host = host[:-1] print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL) socket.bind(host) while True: try: message = socket.recv(flags=zmq.NOBLOCK) except zmq.Again as e: message = None if message is None: yield None # NOTE: We have to make the generator non blocking. else: task = json.loads(message) yield task
def __init__(self, push, pull, redis_conf): super(MinerClient, self).__init__() print("Connecting to Redis cache {} ...".format(redis_conf)) redis_host, redis_port, redis_db = redis_conf.split(":") self.redis = redis.StrictRedis(host=redis_host, port=int(redis_port), db=int(redis_db)) self.redis.setnx('transaction', 0) # NOTE: Expiration times for pending/processed tasks in seconds. self.transaction_expiration = 60 * 60 self.result_expiration = 60 * 10 context = zmq.Context() print("Connecting to push socket '{}' ...".format(push)) self.push = context.socket(zmq.PUSH) self.push.connect(push) print("Binding to pull socket '{}' ...".format(pull)) self.pull = context.socket(zmq.PULL) self.pull.bind(pull)
def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind): pull_socket = socket_factory.create(zmq.PULL) connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): assert pull_socket.poll(1000) == zmq.POLLIN message = pull_socket.recv_multipart() assert message == [b'hello', b'world'] with run_in_background(run) as event: async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.PUSH) connect_or_bind(socket, 'tcp://127.0.0.1:3333') await socket.send_multipart([b'hello', b'world']) while not event.is_set(): await asyncio.sleep(0.1)
def test_shadow_pyczmq(self): try: from pyczmq import zctx, zsocket except Exception: raise SkipTest("Requires pyczmq") ctx = zctx.new() ca = zsocket.new(ctx, zmq.PUSH) cb = zsocket.new(ctx, zmq.PULL) a = zmq.Socket.shadow(ca) b = zmq.Socket.shadow(cb) a.bind("inproc://a") b.connect("inproc://a") a.send(b'hi') rcvd = self.recv(b) self.assertEqual(rcvd, b'hi')
def test_shadow_pyczmq(self): try: from pyczmq import zctx, zsocket, zstr except Exception: raise SkipTest("Requires pyczmq") ctx = zctx.new() a = zsocket.new(ctx, zmq.PUSH) zsocket.bind(a, "inproc://a") ctx2 = self.Context.shadow_pyczmq(ctx) b = ctx2.socket(zmq.PULL) b.connect("inproc://a") zstr.send(a, b'hi') rcvd = self.recv(b) self.assertEqual(rcvd, b'hi') b.close()
def execute_command_streamer(): from oldspeak.console.parsers.streamer import parser args = parser.parse_args(get_sub_parser_argv()) bootstrap_conf_with_gevent(args) device = Device(zmq.STREAMER, zmq.PULL, zmq.PUSH) device.bind_in(args.pull) device.bind_out(args.push) if args.pull_hwm: device.setsockopt_in(zmq.RCVHWM, args.pull_hwm) if args.push_hwm: device.setsockopt_out(zmq.SNDHWM, args.push_hwm) print "oldspeak streamer started" print "date", datetime.utcnow().isoformat() print "pull", (getattr(args, 'pull')) print "push", (getattr(args, 'push')) device.start()
def consumer(): consumer_id = random.randrange(1,10005) print "I am consumer #%s" % (consumer_id) context = zmq.Context() # recieve work consumer_receiver = context.socket(zmq.PULL) consumer_receiver.connect("tcp://127.0.0.1:5557") # send work consumer_sender = context.socket(zmq.PUSH) consumer_sender.connect("tcp://127.0.0.1:5558") while True: work = consumer_receiver.recv_json() data = work['num'] result = { 'consumer' : consumer_id, 'num' : data} if data%2 == 0: consumer_sender.send_json(result)
def __init__(self, name, send_qsize=0, mode='ipc'): self._name = name self._conn_info = None self._context_lock = threading.Lock() self._context = zmq.Context() self._tosock = self._context.socket(zmq.ROUTER) self._frsock = self._context.socket(zmq.PULL) self._tosock.set_hwm(10) self._frsock.set_hwm(10) self._dispatcher = CallbackManager() self._send_queue = queue.Queue(maxsize=send_qsize) self._rcv_thread = None self._snd_thread = None self._mode = mode assert mode in ('ipc', 'tcp')
def _setup_ipc(self): ''' Subscribe to the pub IPC and publish the messages on the right transport. ''' self.ctx = zmq.Context() log.debug('Setting up the publisher puller') self.sub = self.ctx.socket(zmq.PULL) self.sub.bind(PUB_IPC_URL) try: self.sub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
def main(): try: context = zmq.Context(1) # Socket facing clients frontend = context.socket(zmq.PULL) frontend.bind("tcp://*:5559") # Socket facing services backend = context.socket(zmq.PUSH) backend.bind("tcp://*:5560") zmq.device(zmq.STREAMER, frontend, backend) except Exception, e: print e print "bringing down zmq device" finally: pass frontend.close() backend.close() context.term()
def init_recv_socket(self): logger.info("Initalizing receive socket") self.context = zmq.Context() self.socket = self.context.socket(zmq.PULL) logger.info("Initalized receive socket") while not self.isInterruptionRequested(): try: time.sleep(0.1) logger.info("Trying to get a connection to gnuradio...") self.socket.connect("tcp://{0}:{1}".format(self.ip, self.gr_port)) logger.info("Got connection") break except (ConnectionRefusedError, ConnectionResetError): continue except Exception as e: logger.error("Unexpected error", str(e))
def test_send_1k_push_pull(self): down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL) eventlet.sleep() done = eventlet.Event() def tx(): tx_i = 0 while tx_i <= 1000: tx_i += 1 down.send(str(tx_i).encode()) def rx(): while True: rx_i = up.recv() if rx_i == b"1000": done.send(0) break eventlet.spawn(tx) eventlet.spawn(rx) final_i = done.wait() self.assertEqual(final_i, 0)
def __init__(self, opts=None): if opts is None: self.opts = self.process_config(CONFIG_LOCATION) else: self.opts = opts return # General setup of ZeroMQ self.ctx = zmq.Context() self.loop = zmq.eventloop.IOLoop.instance() # Begin setup of PULL socket self.pull_socket = self.ctx.socket(zmq.PULL) self.pull_socket.bind('tcp://127.0.0.1:2001') self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop) self.pull_stream.on_recv(self.stream_decode) # Begin setup of PUSH socket for IPC to publisher self.push_socket = self.ctx.socket(zmq.PUSH) self.push_socket.connect('ipc:///tmp/reactor.ipc') self.push_stream = zmq.eventloop.zmqstream.ZMQStream(self.push_socket, self.loop) self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdriventalk/actions') self.registers = loader.load_registers(self.opts, '/home/mp/devel/eventdriventalk/registers') self.rules = loader.load_registers(self.opts, '/home/mp/devel/eventdriventalk/rules')
def msg_server(self): import zmq import time context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://*:%s"%config.ZMQ_PORT) logging.info("zmq msg_server start...") while not self.is_terminated: # Wait for next request from client message = socket.recv() logging.info("new pull message: %s", message) self.process_message(message) time.sleep (1) # Do some 'work'
def main(self): """ The main of the logger. """ config = Config.get() # Startup logger. self.__startup() # Register our socket for asynchronous incoming messages. self.__message_controller.register_end_point('pull', zmq.PULL, config.get_logger_pull_end_point()) # Register supported message types self.__message_controller.register_message_type(HaltMessage.MESSAGE_TYPE) self.__message_controller.register_message_type(LogFileMessage.MESSAGE_TYPE) # Register message received event handlers. self.__message_controller.register_listener(HaltMessage.MESSAGE_TYPE, HaltMessageEventHandler.handle) self.__message_controller.register_listener(LogFileMessage.MESSAGE_TYPE, LogFileMessageEventHandler.handle) # Register other event handlers. self.__event_controller.event_queue_empty.register_listener(self.__message_controller.receive_message) # Run the event loop. self.__event_controller.loop() # Shutdown logger. self.__shutdown() # ------------------------------------------------------------------------------------------------------------------
def test_context_manager(self): url = 'inproc://a' with self.Context() as ctx: with ctx.socket(zmq.PUSH) as a: a.bind(url) with ctx.socket(zmq.PULL) as b: b.connect(url) msg = b'hi' a.send(msg) rcvd = self.recv(b) self.assertEqual(rcvd, msg) self.assertEqual(b.closed, True) self.assertEqual(a.closed, True) self.assertEqual(ctx.closed, True)
def test_identity(self): s = self.context.socket(zmq.PULL) self.sockets.append(s) ident = b'identity\0\0' s.identity = ident self.assertEqual(s.get(zmq.IDENTITY), ident)
def test_shadow(self): p = self.socket(zmq.PUSH) p.bind("tcp://127.0.0.1:5555") p2 = zmq.Socket.shadow(p.underlying) self.assertEqual(p.underlying, p2.underlying) s = self.socket(zmq.PULL) s2 = zmq.Socket.shadow(s.underlying) self.assertNotEqual(s.underlying, p.underlying) self.assertEqual(s.underlying, s2.underlying) s2.connect("tcp://127.0.0.1:5555") sent = b'hi' p2.send(sent) rcvd = self.recv(s2) self.assertEqual(rcvd, sent)
def test_recv_multipart(self): @gen.coroutine def test(): a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL) f = b.recv_multipart() assert not f.done() yield a.send(b'hi') recvd = yield f self.assertEqual(recvd, [b'hi']) self.loop.run_sync(test)