Python zmq 模块,ROUTER 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.ROUTER。
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def run(self):
self._loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(self._loop)
self.context = zmq.asyncio.Context()
self.status_sock = self.context.socket(zmq.ROUTER)
self.data_sock = self.context.socket(zmq.PUB)
self.status_sock.bind("tcp://*:%s" % self.status_port)
self.data_sock.bind("tcp://*:%s" % self.data_port)
self.poller = zmq.asyncio.Poller()
self.poller.register(self.status_sock, zmq.POLLIN)
self._loop.create_task(self.poll_sockets())
try:
self._loop.run_forever()
finally:
self.status_sock.close()
self.data_sock.close()
self.context.destroy()
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
raise Exception("Datadir %s is not a valid directory" % data_dir)
self.worker_id = binascii.hexlify(os.urandom(8))
self.node_name = socket.gethostname()
self.data_dir = data_dir
self.data_files = set()
context = zmq.Context()
self.socket = context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.identity = self.worker_id
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.redis_server = redis.from_url(redis_url)
self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers
self.check_controllers()
self.last_wrm = 0
self.start_time = time.time()
self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
self.logger.setLevel(loglevel)
self.msg_count = 0
signal.signal(signal.SIGTERM, self.term_signal())
def handle_in(self):
self.msg_count_in += 1
data = self.socket.recv_multipart()
binary, sender = None, None # initialise outside for edge cases
if len(data) == 3:
if data[1] == '': # This is a RPC call from a zmq.REQ socket
sender, _blank, msg_buf = data
self.handle_rpc(sender, msg_factory(msg_buf))
return
sender, msg_buf, binary = data
elif len(data) == 2: # This is an internode call from another zmq.ROUTER, a Controller or Worker
sender, msg_buf = data
msg = msg_factory(msg_buf)
if binary:
msg['data'] = binary
if sender in self.others:
self.handle_peer(sender, msg)
else:
self.handle_worker(sender, msg)
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def __init__(self, port=None, min_port=5000, max_port=9999, pipeline=100, log_file=None):
"""Create a new ZMQDealer object.
"""
context = zmq.Context.instance()
# noinspection PyUnresolvedReferences
self.socket = context.socket(zmq.ROUTER)
self.socket.hwm = pipeline
if port is not None:
self.socket.bind('tcp://*:%d' % port)
self.port = port
else:
self.port = self.socket.bind_to_random_port('tcp://*', min_port=min_port, max_port=max_port)
self.addr = None
self._log_file = log_file
if self._log_file is not None:
self._log_file = os.path.abspath(self._log_file)
# If log file directory does not exists, create it
log_dir = os.path.dirname(self._log_file)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# clears any existing log
if os.path.exists(self._log_file):
os.remove(self._log_file)
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None):
self.bind_address = bind_address
self.loop = loop
self.context = zmq.asyncio.Context()
self.poll_timeout = poll_timeout
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, linger)
self.in_poller = zmq.asyncio.Poller()
self.in_poller.register(self.socket, zmq.POLLIN)
log.info('Bound to: ' + self.bind_address)
self.socket.bind(self.bind_address)
self._kill = False
def __init__(self, targname, cfg, isServer=False):
self.targname = targname
self.cfg = cfg
self.isServer = isServer
self.fnCallName = ''
self.ctx = zmq.Context()
self.ctx.linger = 100
if not self.isServer:
self.sock = self.ctx.socket(zmq.DEALER)
self.sock.linger = 100
self.sock.connect('tcp://%s:%s' % (self.cfg['server'],self.cfg.get('port',7677))) # this times out with EINVAL when no internet
self.poller = zmq.Poller()
self.poller.register(self.sock, zmq.POLLIN)
else:
self.sock = self.ctx.socket(zmq.ROUTER)
self.sock.linger = 100
self.sock.bind('tcp://*:%s' % (self.cfg.get('port',7677)))
self.poller = zmq.Poller()
self.poller.register(self.sock, zmq.POLLIN)
self.be = GetBackend(self.cfg['backend'])(self.targname, self.cfg)
self.inTime = time.time()
self.inactiveLimit = int(self.cfg.get('inactivelimit',0))
print 'inactivelimit ',self.inactiveLimit
def __init__(self, address, port, logger):
"""
Initialize new instance with given address and port.
:param address: String representation of IP address
to listen to or a hostname.
:param port: String port where to listen.
:param logger: System logger
"""
self._logger = logger
context = zmq.Context()
self._receiver = context.socket(zmq.ROUTER)
self._receiver.setsockopt(zmq.IDENTITY, b"recodex-monitor")
address = "tcp://{}:{}".format(address, port)
self._receiver.bind(address)
self._logger.info("zeromq server initialized at {}".format(address))
def __init__(self, name, send_qsize=0, mode='ipc'):
self._name = name
self._conn_info = None
self._context_lock = threading.Lock()
self._context = zmq.Context()
self._tosock = self._context.socket(zmq.ROUTER)
self._frsock = self._context.socket(zmq.PULL)
self._tosock.set_hwm(10)
self._frsock.set_hwm(10)
self._dispatcher = CallbackManager()
self._send_queue = queue.Queue(maxsize=send_qsize)
self._rcv_thread = None
self._snd_thread = None
self._mode = mode
assert mode in ('ipc', 'tcp')
def __init__(self, config):
super().__init__(config)
self.paused = False
slave_queue = self.ctx.socket(zmq.ROUTER)
slave_queue.ipv6 = True
slave_queue.bind(config.slave_queue)
self.register(slave_queue, self.handle_slave)
self.status_queue = self.ctx.socket(zmq.PUSH)
self.status_queue.hwm = 10
self.status_queue.connect(const.INT_STATUS_QUEUE)
SlaveState.status_queue = self.status_queue
self.builds_queue = self.ctx.socket(zmq.REQ)
self.builds_queue.hwm = 1
self.builds_queue.connect(config.builds_queue)
self.index_queue = self.ctx.socket(zmq.PUSH)
self.index_queue.hwm = 10
self.index_queue.connect(config.index_queue)
self.db = DbClient(config)
self.fs = FsClient(config)
self.slaves = {}
self.pypi_simple = config.pypi_simple
def __init__(self, config):
super().__init__(config)
self.output_path = Path(config.output_path)
TransferState.output_path = self.output_path
file_queue = self.ctx.socket(zmq.ROUTER)
file_queue.ipv6 = True
file_queue.hwm = TransferState.pipeline_size * 50
file_queue.bind(config.file_queue)
fs_queue = self.ctx.socket(zmq.REP)
fs_queue.hwm = 1
fs_queue.bind(config.fs_queue)
self.register(file_queue, self.handle_file)
self.register(fs_queue, self.handle_fs_request)
self.pending = {} # keyed by slave_id
self.active = {} # keyed by slave address
self.complete = {} # keyed by slave_id
def _receive_message(self):
"""
Internal coroutine for receiving messages
"""
while True:
try:
if self._socket.getsockopt(zmq.TYPE) == zmq.ROUTER:
zmq_identity, msg_bytes = \
yield from self._socket.recv_multipart()
self._received_from_identity(zmq_identity)
self._dispatcher_queue.put_nowait(
(zmq_identity, msg_bytes))
else:
msg_bytes = yield from self._socket.recv()
self._last_message_time = time.time()
self._dispatcher_queue.put_nowait((None, msg_bytes))
except CancelledError:
# The concurrent.futures.CancelledError is caught by asyncio
# when the Task associated with the coroutine is cancelled.
# The raise is required to stop this component.
raise
except Exception as e: # pylint: disable=broad-except
LOGGER.exception("Received a message on address %s that "
"caused an error: %s", self._address, e)
def open(self):
# noinspection PyUnresolvedReferences
self.listener = self.ctx.socket(zmq.ROUTER)
# noinspection PyUnresolvedReferences
# self.poller.register(self.listener, test.POLLIN)
public, secret = self.selfEncKeys
self.listener.curve_secretkey = secret
self.listener.curve_publickey = public
self.listener.curve_server = True
self.listener.identity = self.publicKey
logger.debug(
'{} will bind its listener at {}'.format(self, self.ha[1]))
set_keepalive(self.listener, self.config)
set_zmq_internal_queue_length(self.listener, self.config)
self.listener.bind(
'{protocol}://*:{port}'.format(
port=self.ha[1], protocol=ZMQ_NETWORK_PROTOCOL)
)
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def Server(context):
context = context or zmq.Context().instance()
# Socket facing clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Socket facing services
backend = context.socket(zmq.ROUTER)
backend.bind("tcp://*:5560")
print "zmq server running on localhost:5559/5560"
poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)
poll_both = zmq.Poller()
poll_both.register(backend, zmq.POLLIN)
poll_both.register(frontend, zmq.POLLIN)
clients = [[]]
while True:
if clients:
sockets = dict(poll_both.poll())
else:
sockets = dict(poll_workers.poll())
if sockets.get(frontend) == ZMQ.POLLIN:
clientRequest = frontend.recv_multipart()
clients.append(clientRequest[0]) #push client into queue
if sockets.get(backend) == ZMQ.POLLIN:
#workers want data
msg = backend.recv_multipart()
workerIdentity = msg[0]
clientIdentity,request = clients.pop(0)
workRequest = [workerIdentity, '',request]
backend.send_multipart(workRequest)
yelpResponse = backend.recv_multipart()[2]
#fulfill frontend request
frontendResponse = [clientIdentity,'',yelpResponse]
frontend.send_multipart(frontendResponse)
def worker(client,location,query):
"""use the yelp api to find the desired place at a location"""
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.ROUTER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'A')
service.connect("tcp://localhost:5560")
while True:
#send our identity
service.send('')
message = service.recv()
with myLock:
print "yelp worker got:"
print message
if message != "":
response = queryYelp(client, request)
service.send(response)
elif message == "END":
break
# else:
# with myLock:
# print "the server has the wrong identities!"
# break
def __init__(self, app, ctx, port=6606):
self.app = app
self.ctx = ctx
self.publisher = self.ctx.socket(zmq.PUB)
self.publisher.bind("tcp://*:" + str(port))
self.snapshot = ctx.socket(zmq.ROUTER)
self.snapshot.bind("tcp://*:" + str(port+2))
self.snapshot = ZMQStream(self.snapshot)
self.snapshot.on_recv(self.handle_snapshot)
self.seqNr = 0
def __init__(self, cfg, target=None, args=[]):
(self.target,self.args) = (target,args)
# Set up server socket
self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER)
self.port = self.socket.bind_to_random_port('tcp://*', min_port=cfg["port_min"], max_port=cfg["port_max"], max_tries=100)
self.uri = "tcp://127.0.0.1:%d"%(self.port)
print "Controller listening on %s"%(self.uri)
self.start()
def _memoryTask(settings, logger,master, url_setFrontend, url_getFrontend, url_getBackend, url_setBackend):
from Cache import Slab, CacheSlubLRU
# grab settings
slabSize = settings.getSlabSize()
preallocatedPool = settings.getPreallocatedPool()
getterNumber = settings.getGetterThreadNumber()
# initialize cache
cache = CacheSlubLRU(preallocatedPool , slabSize, logger) #set as 10 mega, 1 mega per slab
#log
logger.debug("Memory Process initialized:" + str(preallocatedPool) + "B, get# = " + str(getterNumber))
# Prepare our context and sockets
context = zmq.Context.instance()
# Socket to talk to get
socketGetFrontend = context.socket(zmq.ROUTER)
socketGetFrontend.bind(url_getFrontend)
# Socket to talk to workers
socketGetBackend = context.socket(zmq.DEALER)
socketGetBackend.bind(url_getBackend)
timing = {}
timing["getters"] = []
timing["setters"] = [-1]
Thread(name='MemoryGetProxy',target=_proxyThread, args=(logger, master, socketGetFrontend, socketGetBackend, url_getFrontend, url_getBackend)).start()
for i in range(getterNumber):
timing["getters"].append(-1)
th = Thread(name='MemoryGetter',target=_getThread, args=(i,logger, settings, cache,master,url_getBackend, timing))
th.start()
slaveSetQueue = Queue.Queue()
hostState = {}
hostState["current"] = None
Thread(name='MemoryPerformanceMetricator',target=_memoryMetricatorThread, args=(logger, cache, settings, master, timing)).start()
Thread(name='MemorySlaveSetter',target=_setToSlaveThread, args=(logger,settings, cache,master,url_getBackend, slaveSetQueue, hostState)).start()
_setThread(logger, settings, cache,master,url_setFrontend,slaveSetQueue, hostState, timing)
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO):
self.redis_url = redis_url
self.redis_server = redis.from_url(redis_url)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes
self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.node_name = socket.gethostname()
self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399,
max_tries=100)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F:
F.write(self.address)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F:
F.write(str(os.getpid()))
self.logger = bqueryd.logger.getChild('controller').getChild(self.address)
self.logger.setLevel(loglevel)
self.msg_count_in = 0
self.rpc_results = [] # buffer of results that are ready to be returned to callers
self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs
self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones...
self.files_map = {} # shows on which workers a file is available on
self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity
self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages
self.is_running = True
self.last_heartbeat = 0
self.others = {} # A dict of other Controllers running on other DQE nodes
self.start_time = time.time()
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()