Python zmq 模块,device() 实例源码
我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用zmq.device()。
def main():
try:
context = zmq.Context(1)
# Socket do cliente
frontend = context.socket(zmq.XREP)
frontend.bind("tcp://*:5559")
# Socket do servidor
backend = context.socket(zmq.XREQ)
backend.bind("tcp://*:5560")
zmq.device(zmq.QUEUE, frontend, backend)
except :
for val in sys.exc_info():
print(val)
print("Desativa a fila")
finally:
pass
frontend.close()
backend.close()
context.term()
def zmq_streamer():
try:
context = zmq.Context()
# Socket facing clients
frontend = context.socket(zmq.PUSH)
frontend.bind("tcp://*:%s" % (zmq_queue_port_push))
# Socket facing services
backend = context.socket(zmq.PULL)
backend.bind("tcp://*:%s" % (zmq_queue_port_pull))
zmq.device(zmq.STREAMER, frontend, backend)
except Exception as e:
print(e)
print("bringing down zmq device")
finally:
frontend.close()
backend.close()
context.term()
def main(pub_port=None, sub_port=None):
'''main of forwarder
:param sub_port: port for subscribers
:param pub_port: port for publishers
'''
try:
if sub_port is None:
sub_port = get_sub_port()
if pub_port is None:
pub_port = get_pub_port()
context = zmq.Context(1)
frontend = context.socket(zmq.SUB)
backend = context.socket(zmq.PUB)
frontend.bind('tcp://*:{pub_port}'.format(pub_port=pub_port))
frontend.setsockopt(zmq.SUBSCRIBE, b'')
backend.bind('tcp://*:{sub_port}'.format(sub_port=sub_port))
zmq.device(zmq.FORWARDER, frontend, backend)
except KeyboardInterrupt:
pass
finally:
frontend.close()
backend.close()
context.term()
def main():
try:
context = zmq.Context(1)
# Socket facing clients
frontend = context.socket(zmq.PULL)
frontend.bind("tcp://*:5559")
# Socket facing services
backend = context.socket(zmq.PUSH)
backend.bind("tcp://*:5560")
zmq.device(zmq.STREAMER, frontend, backend)
except Exception, e:
print e
print "bringing down zmq device"
finally:
pass
frontend.close()
backend.close()
context.term()
def main():
try:
context = zmq.Context(1)
# Socket facing clients
frontend = context.socket(zmq.SUB)
frontend.bind("tcp://*:5559")
frontend.setsockopt(zmq.SUBSCRIBE, "")
# Socket facing services
backend = context.socket(zmq.PUB)
backend.bind("tcp://*:5560")
# yo! where is the pizza?
zmq.device(zmq.FORWARDER, frontend, backend)
except Exception, e:
logging.error(e)
print("let it crash")
finally:
frontend.close()
backend.close()
context.term()
def device(device_type, isocket, osocket):
"""Start a zeromq device (gevent-compatible).
Unlike the true zmq.device, this does not release the GIL.
Parameters
----------
device_type : (QUEUE, FORWARDER, STREAMER)
The type of device to start (ignored).
isocket : Socket
The Socket instance for the incoming traffic.
osocket : Socket
The Socket instance for the outbound traffic.
"""
p = Poller()
if osocket == -1:
osocket = isocket
p.register(isocket, zmq.POLLIN)
p.register(osocket, zmq.POLLIN)
while True:
events = dict(p.poll())
if isocket in events:
osocket.send_multipart(isocket.recv_multipart())
if osocket in events:
isocket.send_multipart(osocket.recv_multipart())
def run_device(self):
"""The runner method.
Do not call me directly, instead call ``self.start()``, just like a Thread.
"""
ins,outs = self._setup_sockets()
device(self.device_type, ins, outs)
def start(self):
"""Start the device. Override me in subclass for other launchers."""
return self.run()
def test_core(self):
"""test core imports"""
from zmq import Context
from zmq import Socket
from zmq import Poller
from zmq import Frame
from zmq import constants
from zmq import device, proxy
from zmq import Stopwatch
from zmq import (
zmq_version,
zmq_version_info,
pyzmq_version,
pyzmq_version_info,
)
def test_devices(self):
"""test device imports"""
import zmq.devices
from zmq.devices import basedevice
from zmq.devices import monitoredqueue
from zmq.devices import monitoredqueuedevice
def device(device_type, frontend, backend, gevent=False):
if gevent:
from zmq.green import device
else:
from zmq import device
device(device_type, frontend, backend)
def device(device_type, isocket, osocket):
"""Start a zeromq device (gevent-compatible).
Unlike the true zmq.device, this does not release the GIL.
Parameters
----------
device_type : (QUEUE, FORWARDER, STREAMER)
The type of device to start (ignored).
isocket : Socket
The Socket instance for the incoming traffic.
osocket : Socket
The Socket instance for the outbound traffic.
"""
p = Poller()
if osocket == -1:
osocket = isocket
p.register(isocket, zmq.POLLIN)
p.register(osocket, zmq.POLLIN)
while True:
events = dict(p.poll())
if isocket in events:
osocket.send_multipart(isocket.recv_multipart())
if osocket in events:
isocket.send_multipart(osocket.recv_multipart())
def run_device(self):
"""The runner method.
Do not call me directly, instead call ``self.start()``, just like a Thread.
"""
ins,outs = self._setup_sockets()
device(self.device_type, ins, outs)
def start(self):
"""Start the device. Override me in subclass for other launchers."""
return self.run()
def test_core(self):
"""test core imports"""
from zmq import Context
from zmq import Socket
from zmq import Poller
from zmq import Frame
from zmq import constants
from zmq import device, proxy
from zmq import Stopwatch
from zmq import (
zmq_version,
zmq_version_info,
pyzmq_version,
pyzmq_version_info,
)
def test_devices(self):
"""test device imports"""
import zmq.devices
from zmq.devices import basedevice
from zmq.devices import monitoredqueue
from zmq.devices import monitoredqueuedevice
def run_device(self):
"""The runner method.
Do not call me directly, instead call ``self.start()``, just like a Thread.
"""
ins,outs = self._setup_sockets()
device(self.device_type, ins, outs)
def start(self):
"""Start the device. Override me in subclass for other launchers."""
return self.run()
def test_core(self):
"""test core imports"""
from zmq import Context
from zmq import Socket
from zmq import Poller
from zmq import Frame
from zmq import constants
from zmq import device, proxy
from zmq import Stopwatch
from zmq import (
zmq_version,
zmq_version_info,
pyzmq_version,
pyzmq_version_info,
)
def test_devices(self):
"""test device imports"""
import zmq.devices
from zmq.devices import basedevice
from zmq.devices import monitoredqueue
from zmq.devices import monitoredqueuedevice
def device(device_type, isocket, osocket):
"""Start a zeromq device (gevent-compatible).
Unlike the true zmq.device, this does not release the GIL.
Parameters
----------
device_type : (QUEUE, FORWARDER, STREAMER)
The type of device to start (ignored).
isocket : Socket
The Socket instance for the incoming traffic.
osocket : Socket
The Socket instance for the outbound traffic.
"""
p = Poller()
if osocket == -1:
osocket = isocket
p.register(isocket, zmq.POLLIN)
p.register(osocket, zmq.POLLIN)
while True:
events = dict(p.poll())
if isocket in events:
osocket.send_multipart(isocket.recv_multipart())
if osocket in events:
isocket.send_multipart(osocket.recv_multipart())
def start(self):
"""Start the device. Override me in subclass for other launchers."""
return self.run()
def test_core(self):
"""test core imports"""
from zmq import Context
from zmq import Socket
from zmq import Poller
from zmq import Frame
from zmq import constants
from zmq import device, proxy
from zmq import Stopwatch
from zmq import (
zmq_version,
zmq_version_info,
pyzmq_version,
pyzmq_version_info,
)
def test_devices(self):
"""test device imports"""
import zmq.devices
from zmq.devices import basedevice
from zmq.devices import monitoredqueue
from zmq.devices import monitoredqueuedevice
def device(device_type, isocket, osocket):
"""Start a zeromq device (gevent-compatible).
Unlike the true zmq.device, this does not release the GIL.
Parameters
----------
device_type : (QUEUE, FORWARDER, STREAMER)
The type of device to start (ignored).
isocket : Socket
The Socket instance for the incoming traffic.
osocket : Socket
The Socket instance for the outbound traffic.
"""
p = Poller()
if osocket == -1:
osocket = isocket
p.register(isocket, zmq.POLLIN)
p.register(osocket, zmq.POLLIN)
while True:
events = dict(p.poll())
if isocket in events:
osocket.send_multipart(isocket.recv_multipart())
if osocket in events:
isocket.send_multipart(osocket.recv_multipart())
def run_device(self):
"""The runner method.
Do not call me directly, instead call ``self.start()``, just like a Thread.
"""
ins,outs = self._setup_sockets()
device(self.device_type, ins, outs)
def test_core(self):
"""test core imports"""
from zmq import Context
from zmq import Socket
from zmq import Poller
from zmq import Frame
from zmq import constants
from zmq import device, proxy
from zmq import Stopwatch
from zmq import (
zmq_version,
zmq_version_info,
pyzmq_version,
pyzmq_version_info,
)
def test_devices(self):
"""test device imports"""
import zmq.devices
from zmq.devices import basedevice
from zmq.devices import monitoredqueue
from zmq.devices import monitoredqueuedevice
def device(device_type, isocket, osocket):
"""Start a zeromq device (gevent-compatible).
Unlike the true zmq.device, this does not release the GIL.
Parameters
----------
device_type : (QUEUE, FORWARDER, STREAMER)
The type of device to start (ignored).
isocket : Socket
The Socket instance for the incoming traffic.
osocket : Socket
The Socket instance for the outbound traffic.
"""
p = Poller()
if osocket == -1:
osocket = isocket
p.register(isocket, zmq.POLLIN)
p.register(osocket, zmq.POLLIN)
while True:
events = dict(p.poll())
if isocket in events:
osocket.send_multipart(isocket.recv_multipart())
if osocket in events:
isocket.send_multipart(osocket.recv_multipart())
def run_device(self):
"""The runner method.
Do not call me directly, instead call ``self.start()``, just like a Thread.
"""
ins,outs = self._setup_sockets()
device(self.device_type, ins, outs)
def start(self):
"""Start the device. Override me in subclass for other launchers."""
return self.run()
def test_devices(self):
"""test device imports"""
import zmq.devices
from zmq.devices import basedevice
from zmq.devices import monitoredqueue
from zmq.devices import monitoredqueuedevice
def device(device_type, isocket, osocket):
"""Start a zeromq device (gevent-compatible).
Unlike the true zmq.device, this does not release the GIL.
Parameters
----------
device_type : (QUEUE, FORWARDER, STREAMER)
The type of device to start (ignored).
isocket : Socket
The Socket instance for the incoming traffic.
osocket : Socket
The Socket instance for the outbound traffic.
"""
p = Poller()
if osocket == -1:
osocket = isocket
p.register(isocket, zmq.POLLIN)
p.register(osocket, zmq.POLLIN)
while True:
events = dict(p.poll())
if isocket in events:
osocket.send_multipart(isocket.recv_multipart())
if osocket in events:
isocket.send_multipart(osocket.recv_multipart())
def run_device(self):
"""The runner method.
Do not call me directly, instead call ``self.start()``, just like a Thread.
"""
ins,outs = self._setup_sockets()
device(self.device_type, ins, outs)
def start(self):
"""Start the device. Override me in subclass for other launchers."""
return self.run()
def test_core(self):
"""test core imports"""
from zmq import Context
from zmq import Socket
from zmq import Poller
from zmq import Frame
from zmq import constants
from zmq import device, proxy
from zmq import Stopwatch
from zmq import (
zmq_version,
zmq_version_info,
pyzmq_version,
pyzmq_version_info,
)
def main(req_port=None, res_port=None, use_security=False):
'''main of queue
:param req_port: port for clients
:param res_port: port for servers
'''
if req_port is None:
req_port = env.get_req_port()
if res_port is None:
res_port = env.get_res_port()
auth = None
try:
context = zmq.Context()
frontend_service = context.socket(zmq.XREP)
backend_service = context.socket(zmq.XREQ)
if use_security:
if not os.path.exists(env.get_server_public_key_dir()):
create_certificates(env.get_server_public_key_dir())
auth = Authenticator.instance(env.get_server_public_key_dir())
auth.set_server_key(
frontend_service, env.get_server_secret_key_path())
auth.set_client_key(backend_service, env.get_client_secret_key_path(),
env.get_server_public_key_path())
frontend_service.bind('tcp://*:{req_port}'.format(req_port=req_port))
backend_service.bind('tcp://*:{res_port}'.format(res_port=res_port))
zmq.device(zmq.QUEUE, frontend_service, backend_service)
except KeyboardInterrupt:
pass
finally:
frontend_service.close()
backend_service.close()
context.term()
if use_security and auth is not None:
auth.stop()