我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.SUBSCRIBE。
def run(self): """ Entry point for the live plotting when started as a separate process. This starts the loop """ self.entity_name = current_process().name plogger.info("Starting new thread %s", self.entity_name) self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) self.socket.connect("tcp://localhost:%d" % self.port) topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL) self.socket.setsockopt(zmq.SUBSCRIBE, topic) plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port) self.init(**self.init_kwargs) # Reference to animation required so that GC doesn't clean it up. # WILL NOT work if you remove it!!!!! # See: http://matplotlib.org/api/animation_api.html ani = animation.FuncAnimation(self.fig, self.loop, interval=100) self.plt.show()
def rec(port): zmq_ctx = zmq.Context() s = zmq_ctx.socket(zmq.SUB) s.bind('tcp://*:{port}'.format(port=port)) s.setsockopt(zmq.SUBSCRIBE, b"") stream = ZMQStream(s) stream.on_recv_stream(rec_frame) ioloop.IOLoop.instance().start() while True: pass
def test_unicode_sockopts(self): """test setting/getting sockopts with unicode strings""" topic = "tést" if str is not unicode: topic = topic.decode('utf8') p,s = self.create_bound_pair(zmq.PUB, zmq.SUB) self.assertEqual(s.send_unicode, s.send_unicode) self.assertEqual(p.recv_unicode, p.recv_unicode) self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic) self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic) s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16') self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic) s.setsockopt_unicode(zmq.SUBSCRIBE, topic) self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY) self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE) identb = s.getsockopt(zmq.IDENTITY) identu = identb.decode('utf16') identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16') self.assertEqual(identu, identu2) time.sleep(0.1) # wait for connection/subscription p.send_unicode(topic,zmq.SNDMORE) p.send_unicode(topic*2, encoding='latin-1') self.assertEqual(topic, s.recv_unicode()) self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_init_iface(self): logger = self.logger ctx = self.context handler = handlers.PUBHandler(self.iface) self.assertFalse(handler.ctx is ctx) self.sockets.append(handler.socket) # handler.ctx.term() handler = handlers.PUBHandler(self.iface, self.context) self.sockets.append(handler.socket) self.assertTrue(handler.ctx is ctx) handler.setLevel(logging.DEBUG) handler.root_topic = self.topic logger.addHandler(handler) sub = ctx.socket(zmq.SUB) self.sockets.append(sub) sub.setsockopt(zmq.SUBSCRIBE, b(self.topic)) sub.connect(self.iface) import time; time.sleep(0.25) msg1 = 'message' logger.info(msg1) (topic, msg2) = sub.recv_multipart() self.assertEqual(topic, b'zmq.INFO') self.assertEqual(msg2, b(msg1)+b'\n') logger.removeHandler(handler)
def test_init_socket(self): pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB) logger = self.logger handler = handlers.PUBHandler(pub) handler.setLevel(logging.DEBUG) handler.root_topic = self.topic logger.addHandler(handler) self.assertTrue(handler.socket is pub) self.assertTrue(handler.ctx is pub.context) self.assertTrue(handler.ctx is self.context) sub.setsockopt(zmq.SUBSCRIBE, b(self.topic)) import time; time.sleep(0.1) msg1 = 'message' logger.info(msg1) (topic, msg2) = sub.recv_multipart() self.assertEqual(topic, b'zmq.INFO') self.assertEqual(msg2, b(msg1)+b'\n') logger.removeHandler(handler)
def test_root_topic(self): logger, handler, sub = self.connect_handler() handler.socket.bind(self.iface) sub2 = sub.context.socket(zmq.SUB) self.sockets.append(sub2) sub2.connect(self.iface) sub2.setsockopt(zmq.SUBSCRIBE, b'') handler.root_topic = b'twoonly' msg1 = 'ignored' logger.info(msg1) self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK) topic,msg2 = sub2.recv_multipart() self.assertEqual(topic, b'twoonly.INFO') self.assertEqual(msg2, b(msg1)+b'\n') logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'): self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB, in_prefix, out_prefix) alice = self.context.socket(zmq.PAIR) bob = self.context.socket(zmq.PAIR) mon = self.context.socket(zmq.SUB) aport = alice.bind_to_random_port('tcp://127.0.0.1') bport = bob.bind_to_random_port('tcp://127.0.0.1') mport = mon.bind_to_random_port('tcp://127.0.0.1') mon.setsockopt(zmq.SUBSCRIBE, mon_sub) self.device.connect_in("tcp://127.0.0.1:%i"%aport) self.device.connect_out("tcp://127.0.0.1:%i"%bport) self.device.connect_mon("tcp://127.0.0.1:%i"%mport) self.device.start() time.sleep(.2) try: # this is currenlty necessary to ensure no dropped monitor messages # see LIBZMQ-248 for more info mon.recv_multipart(zmq.NOBLOCK) except zmq.ZMQError: pass self.sockets.extend([alice, bob, mon]) return alice, bob, mon
def _subscribe_to_topic(self, alias: str, topic: Union[bytes, str]): ''' Do the actual ZeroMQ subscription of a socket given by its alias to a specific topic. This method only makes sense to be called on SUB/SYNC_SUB sockets. Note that the handler is not set within this function. ''' topic = topic_to_bytes(topic) if isinstance(self.address[alias], AgentAddress): self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic) elif isinstance(self.address[alias], AgentChannel): channel = self.address[alias] sub_address = channel.receiver treated_topic = channel.uuid + topic self.socket[sub_address].setsockopt(zmq.SUBSCRIBE, treated_topic) else: raise NotImplementedError('Unsupported address type %s!' % self.address[alias])
def create_socket(port): """ Create zmq sub socket. """ context = zmq.Context() socket = context.socket(zmq.SUB) try: socket.bind("tcp://*:%s" % port) except zmq.error.ZMQError: print("Address already in use") sys.exit(1) socket.setsockopt(zmq.SUBSCRIBE, b"") print("Start node-masternode Subscribe") return socket, context
def __init__(self, loop, logger, config): print("test") self.loop = loop self.log = logger self.config = config self.zmq_url = config["BITCOIND"]["zeromq"] self.zmqContext = zmq.asyncio.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.MYSQL_CONFIG = config["MYSQL"] self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") self.zmqSubSocket.connect(self.zmq_url) print(self.zmq_url) self.loop.create_task(self.init_db()) self.loop.create_task(self.handle()) self.loop.create_task(self.rpctest()) # self.loop.create_task(self.mysqltest())
def wait_for_news_from(self, address, topic, wait_for_s): if isinstance(address, list): addresses = address else: addresses = [address] socket = self.get_socket(addresses, "subscriber") if isinstance(topic, str): topics = [topic] else: topics = topic for t in topics: socket.set(zmq.SUBSCRIBE, t.encode(config.ENCODING)) try: result = self._receive_with_timeout(socket, wait_for_s, use_multipart=True) unserialised_result = _unserialise_for_pubsub(result) return unserialised_result except (core.SocketTimedOutError, core.SocketInterruptedError): return None, None
def brute_zmq(host, port=5555, user=None, password=None, db=0): context = zmq.Context() # Configure socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics socket.setsockopt(zmq.LINGER, 0) # All topics socket.RCVTIMEO = 1000 # timeout: 1 sec # Connect socket.connect("tcp://%s:%s" % (host, port)) # Try to receive try: socket.recv() return True except Exception: return False finally: socket.close()
def handle_zmq(host, port=5555, extra_config=None): # log.debug(" * Connection to ZeroMQ: %s : %s" % (host, port)) context = zmq.Context() # Configure socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics socket.setsockopt(zmq.LINGER, 0) # All topics socket.RCVTIMEO = 1000 # timeout: 1 sec # Connect socket.connect("tcp://%s:%s" % (host, port)) # Try to receive try: socket.recv() return True except Exception: return False finally: socket.close()
def test_tcp_pub_socket(event_loop, socket_factory, connect_or_bind): sub_socket = socket_factory.create(zmq.SUB) sub_socket.setsockopt(zmq.SUBSCRIBE, b'a') connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = sub_socket.recv_multipart() assert frames == [b'a', b'message'] with run_in_background(run) as thread_done_event: async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.PUB) connect_or_bind(socket, 'tcp://127.0.0.1:3333') while not thread_done_event.is_set(): await socket.send_multipart([b'a', b'message']) await socket.send_multipart([b'b', b'wrong'])
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind): sub_socket = socket_factory.create(zmq.SUB) sub_socket.setsockopt(zmq.SUBSCRIBE, b'a') connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = sub_socket.recv_multipart() assert frames == [b'a', b'message'] with run_in_background(run) as thread_done_event: async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.XPUB) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'\1a'] while not thread_done_event.is_set(): await socket.send_multipart([b'a', b'message']) await socket.send_multipart([b'b', b'wrong']) sub_socket.close() frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'\0a']
def sub_task(self, name, ip_port_sub): if (ip_port_sub == "0"): return ctx = zmq.Context() # subscribe socket socket_sub = ctx.socket(zmq.SUB) socket_sub.connect("tcp://%s" % ip_port_sub) socket_sub.setsockopt(zmq.SUBSCRIBE, '') total_value = 0 self.sub_msg_cnt = 0 while not self.shutdown: string = socket_sub.recv() topic, messageData = string.split() total_value += int(messageData) self.sub_msg_cnt += 1 print("SUB:: [%d] %s %s" % (self.sub_msg_cnt, topic, messageData))
def execute_command_forwarder(): from oldspeak.console.parsers.streamer import parser args = parser.parse_args(get_sub_parser_argv()) bootstrap_conf_with_gevent(args) device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB) device.bind_in(args.subscriber) device.bind_out(args.publisher) device.setsockopt_in(zmq.SUBSCRIBE, b'') if args.subscriber_hwm: device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm) if args.publisher_hwm: device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm) print "oldspeak forwarder started" print "date", datetime.utcnow().isoformat() print "subscriber", (getattr(args, 'subscriber')) print "publisher", (getattr(args, 'publisher')) device.start()
def _data_listener(self): if len(sys.argv) > 1: for l in open(sys.argv[1]).readlines(): QtCore.QMetaObject.invokeMethod( self, "_on_server_message", QtCore.Qt.QueuedConnection, QtCore.Q_ARG(dict, json.loads(l))) port = 9876 context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect ("tcp://localhost:%d" % port) socket.setsockopt(zmq.SUBSCRIBE, '') while True: msg = socket.recv_json() try: QtCore.QMetaObject.invokeMethod( self, "_on_server_message", QtCore.Qt.QueuedConnection, QtCore.Q_ARG(dict, msg)) except AttributeError: pass
def main(pub_port=None, sub_port=None): '''main of forwarder :param sub_port: port for subscribers :param pub_port: port for publishers ''' try: if sub_port is None: sub_port = get_sub_port() if pub_port is None: pub_port = get_pub_port() context = zmq.Context(1) frontend = context.socket(zmq.SUB) backend = context.socket(zmq.PUB) frontend.bind('tcp://*:{pub_port}'.format(pub_port=pub_port)) frontend.setsockopt(zmq.SUBSCRIBE, b'') backend.bind('tcp://*:{sub_port}'.format(sub_port=sub_port)) zmq.device(zmq.FORWARDER, frontend, backend) except KeyboardInterrupt: pass finally: frontend.close() backend.close() context.term()
def run(white_point): config = discover() downstream_url = config['downstream'] socket = context.socket(zmq.SUB) socket.connect(config['downstream']) log.info("Connecting to %s" % downstream_url) socket.setsockopt_string(zmq.SUBSCRIBE, DEFAULT_CHANNEL) stream = ZMQStream(socket) loop = ioloop.IOLoop.instance() with blink1(white_point=white_point) as b1: reciever = Receiver(b1, loop) stream.on_recv(reciever.recieve) loop.add_callback(reciever.throbber) loop.start()
def startup_local_client(): ''' Startup a local ZMQ client to receive the published messages. ''' time.sleep(2) global TEST_CLIENT context = zmq.Context() TEST_CLIENT = context.socket(zmq.SUB) TEST_CLIENT.connect('tcp://{addr}:{port}'.format( addr=NAPALM_LOGS_TEST_PUB_ADDR, port=NAPALM_LOGS_TEST_PUB_PORT) ) TEST_CLIENT.setsockopt(zmq.SUBSCRIBE, b'') # Startup the local ZMQ client.
def set_topic(self, name, topic): """shortcut to :py:meth:SocketManager.set_socket_option(zmq.TOPIC, topic) :param name: the name of the socket where data will pad through :param topic: the option from the ``zmq`` module **Example:** :: >>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN) >>> >>> # subscribe only to topics beginning with "logs" >>> sockets.set_topic('events', 'logs') >>> event = sockets.recv_event_safe('events') >>> event.topic, event.data 'logs:2016-06-20', {'stdout': 'hello world'} """ safe_topic = bytes(topic) self.set_socket_option(name, self.zmq.SUBSCRIBE, safe_topic)
def feedback_loop(self, *args): # feedback socket ctx = zmq.Context() socket = ctx.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, "") socket.connect(config.get("broadcaster-feedback-url", "tcp://localhost:9110")) print "brc feedback channel connected" while True: msg = [socket.recv()] while socket.getsockopt(zmq.RCVMORE): msg.append(socket.recv()) print "feedback msg" if len(msg) == 3: self.on_feedback_msg(*msg) else: print "bad feedback message", len(msg)
def status_loop(self, *args): # feedback socket print "connect brc feedback" ctx = zmq.Context() socket = ctx.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, "") socket.connect(config.get("broadcaster-feedback-url", "tcp://localhost:9112")) print "brc status channel connected" while True: msg = socket.recv() nodes = 0 try: nodes = struct.unpack("<Q", msg)[0] self.last_status = time.time() except: print "bad nodes data", msg if not nodes == self.last_nodes: print "brc hosts", nodes self.last_nodes = nodes