Python zmq 模块,proxy() 实例源码
我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用zmq.proxy()。
def router_main(_, pidx, args):
log = get_logger('examples.zmqserver.extra', pidx)
ctx = zmq.Context()
ctx.linger = 0
in_sock = ctx.socket(zmq.PULL)
in_sock.bind('tcp://*:5000')
out_sock = ctx.socket(zmq.PUSH)
out_sock.bind('ipc://example-events')
try:
log.info('router proxy started')
zmq.proxy(in_sock, out_sock)
except KeyboardInterrupt:
pass
except:
log.exception('unexpected error')
finally:
log.info('router proxy terminated')
in_sock.close()
out_sock.close()
ctx.term()
def run(self):
self.log.debug("Broker starts XPUB:{}, XSUB:{}"
.format(self.xpub_url, self.xsub_url))
# self.proxy.start()
poller = zmq.Poller()
poller.register(self.xpub, zmq.POLLIN)
poller.register(self.xsub, zmq.POLLIN)
self.running = True
while self.running:
events = dict(poller.poll(1000))
if self.xpub in events:
message = self.xpub.recv_multipart()
self.log.debug("subscription message: {}".format(message[0]))
self.xsub.send_multipart(message)
if self.xsub in events:
message = self.xsub.recv_multipart()
self.log.debug("publishing message: {}".format(message))
self.xpub.send_multipart(message)
def run_device(self):
ins,outs,mons = self._setup_sockets()
zmq.proxy(ins, outs, mons)
def _proxyThread(logger, master, frontend, backend, url_frontend, url_backend):
logger.debug("Routing from " + url_frontend + " to " + url_backend)
zmq.proxy(frontend, backend)
def run_device(self):
ins,outs,mons = self._setup_sockets()
zmq.proxy(ins, outs, mons)
def run_device(self):
ins,outs,mons = self._setup_sockets()
zmq.proxy(ins, outs, mons)
def run_device(self):
ins,outs,mons = self._setup_sockets()
zmq.proxy(ins, outs, mons)
def run_device(self):
ins,outs,mons = self._setup_sockets()
zmq.proxy(ins, outs, mons)
def run_device(self):
ins,outs,mons = self._setup_sockets()
zmq.proxy(ins, outs, mons)
def __init__(self, xpub="tcp://127.0.0.1:8990",
xsub="tcp://127.0.0.1:8989"):
self.log = logging.getLogger("{module}.{name}".format(
module=self.__class__.__module__, name=self.__class__.__name__))
super(Broker, self).__init__()
self.running = False
self.xpub_url = xpub
self.xsub_url = xsub
self.ctx = zmq.Context()
self.xpub = self.ctx.socket(zmq.XPUB)
self.xpub.bind(self.xpub_url)
self.xsub = self.ctx.socket(zmq.XSUB)
self.xsub.bind(self.xsub_url)
# self.proxy = zmq.proxy(xpub, xsub)
def _start_mpbs_broker(self):
'''
Multi-Publisher-Multi-Subscriber
'''
context = zmq.Context()
front = context.socket(zmq.SUB) # @UndefinedVariable
front.setsockopt(zmq.SUBSCRIBE, b"") # @UndefinedVariable
front.bind(self.frontend)
back = context.socket(zmq.PUB) # @UndefinedVariable
back.bind(self.backend)
zmq.proxy(front, back) # @UndefinedVariable
def _start_mpup_broker(self):
'''
Multi-Pusher-Mutli-Puller
'''
context = zmq.Context()
front = context.socket(zmq.PULL) # @UndefinedVariable
front.bind(self.frontend)
back = context.socket(zmq.PUSH) # @UndefinedVariable
back.bind(self.backend)
zmq.proxy(front, back) # @UndefinedVariable
def _start_mrer_broker(self):
'''
Multi-Req-Mutli-Rep
'''
zc = zmq.Context()
front = zc.socket(zmq.ROUTER) # @UndefinedVariable
front.bind(self.frontend)
back = zc.socket(zmq.DEALER) # @UndefinedVariable
back.bind(self.backend)
zmq.proxy(front, back) # @UndefinedVariable
def start_forwarder(pub_port, receive_port, mon_port=None, backend_socket=None, frontend_socket=None):
"""Start a zeromq proxy for forwarding messages from TCP socket to zmq PUB socket
:param int pub_port: port number to use for publishing messages to workers
:param int receive_port: port number to use for receiving messages
:param int mon_port (optional): port to use for monitor socket
:param str backend_socket (optionnal): socket type to use for backend socket
:param str frontend_socket (optionnal): socket type to use for frontend socket
"""
context = zmq.Context()
if frontend_socket is not None:
try:
frontend_socket = getattr(zmq, frontend_socket.upper())
except AttributeError:
frontend_socket = zmq.PUB
log.warning("Bad frontend type provided :{}\nForwarder will use default PUB type".format(frontend_socket))
else:
frontend_socket = zmq.PUB
frontend = context.socket(frontend_socket)
frontend.bind("tcp://*:{}".format(pub_port))
if backend_socket is not None:
try:
backend_socket = getattr(zmq, backend_socket.upper())
except AttributeError:
backend_socket = zmq.STREAM
log.warning("Bad backend type provided :{}\nForwarder will use default STREAM type".format(backend_socket))
else:
backend_socket = zmq.STREAM
backend = context.socket(backend_socket)
backend.bind("tcp://*:{}".format(receive_port))
if mon_port is not None:
monitor = context.socket(zmq.PUB)
monitor.bind("tcp://*:{}".format(mon_port))
log.info("Starting forwarder")
log.info("frontend: {}\tbackend: {}\tmonitor: {}".format(pub_port, receive_port, mon_port))
zmq.proxy(frontend, backend, monitor)
else:
log.info("Starting forwarder")
log.info("frontend: {}\tbackend: {}".format(pub_port, receive_port))
zmq.proxy(frontend, backend)