Python zmq 模块,XPUB 实例源码
我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用zmq.XPUB。
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind):
sub_socket = socket_factory.create(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = sub_socket.recv_multipart()
assert frames == [b'a', b'message']
with run_in_background(run) as thread_done_event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XPUB)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'\1a']
while not thread_done_event.is_set():
await socket.send_multipart([b'a', b'message'])
await socket.send_multipart([b'b', b'wrong'])
sub_socket.close()
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'\0a']
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.SUB)
await socket.subscribe(b'a')
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XSUB)
await socket.send_multipart([b'\x01a'])
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def run(self):
self.log.debug("Broker starts XPUB:{}, XSUB:{}"
.format(self.xpub_url, self.xsub_url))
# self.proxy.start()
poller = zmq.Poller()
poller.register(self.xpub, zmq.POLLIN)
poller.register(self.xsub, zmq.POLLIN)
self.running = True
while self.running:
events = dict(poller.poll(1000))
if self.xpub in events:
message = self.xpub.recv_multipart()
self.log.debug("subscription message: {}".format(message[0]))
self.xsub.send_multipart(message)
if self.xsub in events:
message = self.xsub.recv_multipart()
self.log.debug("publishing message: {}".format(message))
self.xpub.send_multipart(message)
def __init__(self, xpub="tcp://127.0.0.1:8990",
xsub="tcp://127.0.0.1:8989"):
self.log = logging.getLogger("{module}.{name}".format(
module=self.__class__.__module__, name=self.__class__.__name__))
super(Broker, self).__init__()
self.running = False
self.xpub_url = xpub
self.xsub_url = xsub
self.ctx = zmq.Context()
self.xpub = self.ctx.socket(zmq.XPUB)
self.xpub.bind(self.xpub_url)
self.xsub = self.ctx.socket(zmq.XSUB)
self.xsub.bind(self.xsub_url)
# self.proxy = zmq.proxy(xpub, xsub)
def start_dispatch_thread():
global INITED, DISPATCHER
if INITED:
return
DISPATCHER = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.XSUB, zmq.XPUB)
DISPATCHER.bind_in(INTERNAL_SOCKET)
DISPATCHER.connect_out(CHANGES_SOCKET)
DISPATCHER.setsockopt_in(zmq.IDENTITY, b'XSUB')
DISPATCHER.setsockopt_out(zmq.IDENTITY, b'XPUB')
DISPATCHER.start()
#Fix weird nosetests problems. TODO: find and fix underlying problem
sleep(0.01)
INITED = True
def __init__(self, address, base_port):
self.ctx = zmq.Context()
self.loop = IOLoop.instance()
self.stats = {
'started': time(),
'spiders_out_recvd': 0,
'spiders_in_recvd': 0,
'db_in_recvd': 0,
'db_out_recvd': 0,
'sw_in_recvd': 0,
'sw_out_recvd': 0
}
socket_config = SocketConfig(address, base_port)
if socket_config.is_ipv6:
self.ctx.setsockopt(zmq.IPV6, True)
spiders_in_s = self.ctx.socket(zmq.XPUB)
spiders_out_s = self.ctx.socket(zmq.XSUB)
sw_in_s = self.ctx.socket(zmq.XPUB)
sw_out_s = self.ctx.socket(zmq.XSUB)
db_in_s = self.ctx.socket(zmq.XPUB)
db_out_s = self.ctx.socket(zmq.XSUB)
spiders_in_s.bind(socket_config.spiders_in())
spiders_out_s.bind(socket_config.spiders_out())
sw_in_s.bind(socket_config.sw_in())
sw_out_s.bind(socket_config.sw_out())
db_in_s.bind(socket_config.db_in())
db_out_s.bind(socket_config.db_out())
self.spiders_in = ZMQStream(spiders_in_s)
self.spiders_out = ZMQStream(spiders_out_s)
self.sw_in = ZMQStream(sw_in_s)
self.sw_out = ZMQStream(sw_out_s)
self.db_in = ZMQStream(db_in_s)
self.db_out = ZMQStream(db_out_s)
self.spiders_out.on_recv(self.handle_spiders_out_recv)
self.sw_out.on_recv(self.handle_sw_out_recv)
self.db_out.on_recv(self.handle_db_out_recv)
self.sw_in.on_recv(self.handle_sw_in_recv)
self.db_in.on_recv(self.handle_db_in_recv)
self.spiders_in.on_recv(self.handle_spiders_in_recv)
logging.basicConfig(format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
self.logger = logging.getLogger("distributed_frontera.messagebus"
".zeromq.broker.Server")
self.logger.info("Using socket: {}:{}".format(socket_config.ip_addr,
socket_config.base_port))