Python zmq 模块,POLLIN 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.POLLIN。
def receive_message(self, event, event_data, listener_data):
"""
Receives a messages from another processes.
:param * event: Not used.
:param * event_data: Not used.
:param * listener_data: Not used.
"""
del event, event_data, listener_data
# Make a poller for all incoming sockets.
poller = zmq.Poller()
for socket in self.__end_points.values():
if socket.type in [zmq.PULL, zmq.REP]:
poller.register(socket, zmq.POLLIN)
# Wait for socket is ready for reading.
socks = dict(poller.poll())
for name, socket in self.__end_points.items():
if socket in socks:
self._receive_message(name, socket)
# ------------------------------------------------------------------------------------------------------------------
def no_barking(self, seconds):
"""
During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on
the socket. This method prevent incoming sockets barking that the are ready the for reading.
:param int seconds: The number of seconds the give the other ZMQ thread to start up.
"""
sleep(seconds)
for _ in range(1, len(self.end_points)):
poller = zmq.Poller()
for socket in self.end_points.values():
if socket.type in [zmq.PULL, zmq.REP]:
poller.register(socket, zmq.POLLIN)
poller.poll(1)
# ----------------------------------------------------------------------------------------------------------------------
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_poll(self):
@gen.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.poll(timeout=0)
self.assertEqual(f.result(), 0)
f = b.poll(timeout=1)
assert not f.done()
evt = yield f
self.assertEqual(evt, 0)
f = b.poll(timeout=1000)
assert not f.done()
yield a.send_multipart([b'hi', b'there'])
evt = yield f
self.assertEqual(evt, zmq.POLLIN)
recvd = yield b.recv_multipart()
self.assertEqual(recvd, [b'hi', b'there'])
self.loop.run_sync(test)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def poll(self, timeout=None, flags=_zmq.POLLIN):
"""poll the socket for events
returns a Future for the poll results.
"""
if self.closed:
raise _zmq.ZMQError(_zmq.ENOTSUP)
p = self._poller_class()
p.register(self, flags)
f = p.poll(timeout)
future = self._Future()
def unwrap_result(f):
if future.done():
return
if f.exception():
future.set_exception(f.exeception())
else:
evts = dict(f.result())
future.set_result(evts.get(self, 0))
f.add_done_callback(unwrap_result)
return future
def register(self, socket, address, alias=None, handler=None):
assert not self.registered(address), \
'Socket is already registered!'
if not alias:
alias = address
self.socket[alias] = socket
self.socket[address] = socket
self.socket[socket] = socket
self.address[alias] = address
self.address[socket] = address
self.address[address] = address
if handler is not None:
self.poller.register(socket, zmq.POLLIN)
if address.kind in ('SUB', 'SYNC_SUB'):
self.subscribe(socket, handler)
else:
self._set_handler(socket, handler)
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REQ)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.connect(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.PULL)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REP)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def run(self):
self._loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(self._loop)
self.context = zmq.asyncio.Context()
self.status_sock = self.context.socket(zmq.ROUTER)
self.data_sock = self.context.socket(zmq.PUB)
self.status_sock.bind("tcp://*:%s" % self.status_port)
self.data_sock.bind("tcp://*:%s" % self.data_port)
self.poller = zmq.asyncio.Poller()
self.poller.register(self.status_sock, zmq.POLLIN)
self._loop.create_task(self.poll_sockets())
try:
self._loop.run_forever()
finally:
self.status_sock.close()
self.data_sock.close()
self.context.destroy()
def loop(self):
while self.running:
evts = dict(self.poller.poll(50))
if self.socket in evts and evts[self.socket] == zmq.POLLIN:
msg = self.socket.recv_multipart()
msg_type = msg[0].decode()
name = msg[1].decode()
if msg_type == "done":
self.finished.emit(True)
elif msg_type == "data":
result = [name]
# How many pairs of metadata and data are there?
num_arrays = int((len(msg) - 2)/2)
for i in range(num_arrays):
md, data = msg[2+2*i:4+2*i]
md = json.loads(md.decode())
A = np.frombuffer(data, dtype=md['dtype'])
result.append(A)
self.message.emit(tuple(result))
self.socket.close()
def run(self):
""" Contents of the infinite loop. """
# Create zmq sockets
sockets = SupvisorsZmq(self.supvisors)
# create poller
poller = zmq.Poller()
# register sockets
poller.register(sockets.internal_subscriber.socket, zmq.POLLIN)
poller.register(sockets.puller.socket, zmq.POLLIN)
# poll events forever
while not self.stopping():
socks = dict(poller.poll(500))
# test stop condition again: if Supervisor is stopping,
# any XML-RPC call would block this thread, and the other
# because of the join
if not self.stopping():
self.check_requests(sockets, socks)
self.check_events(sockets.internal_subscriber, socks)
# close resources gracefully
poller.unregister(sockets.puller.socket)
poller.unregister(sockets.internal_subscriber.socket)
sockets.close()
def check_events(self, subscriber, socks):
""" Forward external Supervisor events to main thread. """
if subscriber.socket in socks and \
socks[subscriber.socket] == zmq.POLLIN:
try:
message = subscriber.receive()
except:
print >> stderr, '[ERROR] failed to get data from subscriber'
else:
# The events received are not processed directly in this thread
# because it would conflict with the processing in the
# Supervisor thread, as they use the same data.
# That's why a RemoteCommunicationEvent is used to push the
# event in the Supervisor thread.
self.send_remote_comm_event(
RemoteCommEvents.SUPVISORS_EVENT,
json.dumps(message))
def support_test_send_to_multiple_addresses(self, address1, address2):
poller = zmq.Poller()
socket1 = self.context.socket(roles['listener'])
socket2 = self.context.socket(roles['listener'])
try:
socket1.bind("tcp://%s" % address1)
socket2.bind("tcp://%s" % address2)
poller.register(socket1, zmq.POLLIN)
poller.register(socket2, zmq.POLLIN)
polled = dict(poller.poll(2000))
if socket1 in polled:
socket1.recv()
socket1.send(nw0.sockets._serialise(address1))
elif socket2 in polled:
socket2.recv()
socket2.send(nw0.sockets._serialise(address2))
else:
raise RuntimeError("Nothing found")
finally:
socket1.close()
socket2.close()
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
raise Exception("Datadir %s is not a valid directory" % data_dir)
self.worker_id = binascii.hexlify(os.urandom(8))
self.node_name = socket.gethostname()
self.data_dir = data_dir
self.data_files = set()
context = zmq.Context()
self.socket = context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.identity = self.worker_id
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.redis_server = redis.from_url(redis_url)
self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers
self.check_controllers()
self.last_wrm = 0
self.start_time = time.time()
self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
self.logger.setLevel(loglevel)
self.msg_count = 0
signal.signal(signal.SIGTERM, self.term_signal())
def go(self):
self.logger.info('[#############################>. Starting .<#############################]')
while self.is_running:
try:
time.sleep(0.001)
self.heartbeat()
self.free_dead_workers()
for sock, event in self.poller.poll(timeout=POLLING_TIMEOUT):
if event & zmq.POLLIN:
self.handle_in()
if event & zmq.POLLOUT:
self.handle_out()
self.process_sink_results()
except KeyboardInterrupt:
self.logger.debug('Keyboard Interrupt')
self.kill()
except:
self.logger.error("Exception %s" % traceback.format_exc())
self.logger.info('Stopping')
for x in (os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'),
os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address')):
if os.path.exists(x):
os.remove(x)
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.SUB)
await socket.subscribe(b'a')
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XSUB)
await socket.send_multipart([b'\x01a'])
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind):
pull_socket = socket_factory.create(zmq.PULL)
connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pull_socket.poll(1000) == zmq.POLLIN
message = pull_socket.recv_multipart()
assert message == [b'hello', b'world']
with run_in_background(run) as event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PUSH)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
while not event.is_set():
await asyncio.sleep(0.1)
def test_tcp_pair_socket(event_loop, socket_factory, connect_or_bind):
pair_socket = socket_factory.create(zmq.PAIR)
connect_or_bind(pair_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pair_socket.poll(1000) == zmq.POLLIN
message = pair_socket.recv_multipart()
assert message == [b'hello', b'world']
pair_socket.send_multipart([b'my', b'message'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PAIR)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
message = await asyncio.wait_for(socket.recv_multipart(), 1)
assert message == [b'my', b'message']
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def run(self):
self.log.debug("Broker starts XPUB:{}, XSUB:{}"
.format(self.xpub_url, self.xsub_url))
# self.proxy.start()
poller = zmq.Poller()
poller.register(self.xpub, zmq.POLLIN)
poller.register(self.xsub, zmq.POLLIN)
self.running = True
while self.running:
events = dict(poller.poll(1000))
if self.xpub in events:
message = self.xpub.recv_multipart()
self.log.debug("subscription message: {}".format(message[0]))
self.xsub.send_multipart(message)
if self.xsub in events:
message = self.xsub.recv_multipart()
self.log.debug("publishing message: {}".format(message))
self.xpub.send_multipart(message)
def __init__(self, port, pipeline=100, host='localhost', log_file=None):
"""Create a new ZMQDealer object.
"""
context = zmq.Context.instance()
# noinspection PyUnresolvedReferences
self.socket = context.socket(zmq.DEALER)
self.socket.hwm = pipeline
self.socket.connect('tcp://%s:%d' % (host, port))
self._log_file = log_file
self.poller = zmq.Poller()
# noinspection PyUnresolvedReferences
self.poller.register(self.socket, zmq.POLLIN)
if self._log_file is not None:
self._log_file = os.path.abspath(self._log_file)
# If log file directory does not exists, create it
log_dir = os.path.dirname(self._log_file)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# clears any existing log
if os.path.exists(self._log_file):
os.remove(self._log_file)
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None):
self.bind_address = bind_address
self.loop = loop
self.context = zmq.asyncio.Context()
self.poll_timeout = poll_timeout
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, linger)
self.in_poller = zmq.asyncio.Poller()
self.in_poller.register(self.socket, zmq.POLLIN)
log.info('Bound to: ' + self.bind_address)
self.socket.bind(self.bind_address)
self._kill = False
def full_req_transceiver(zmq_url, data):
"""Used to send data and close connection.
:param zmq_url: URL for the socket to connect to.
:param data: The data to send.
:returns: The unpacked response.
"""
# TODO: Harden this
# TODO: Add linger and POLLIN support : https://github.com/zeromq/pyzmq/issues/132
ctx, socket = get_ctx_and_connect_req_socket(zmq_url)
packed = msgpack.packb(data)
socket.send_multipart([packed])
rep = socket.recv()
unpacked_rep = msgpack.unpackb(rep, encoding='utf-8')
socket.close()
ctx.term()
return unpacked_rep
def transceiver(self, payload):
"""Sends and receives messages.
:param payload: A dict representing the message to send.
:returns: A string representing the unpacked response.
"""
# TODO: Harden this
# TODO: Add linger and POLLIN support :
# https://github.com/zeromq/pyzmq/issues/132
packed = msgpack.packb(payload)
# blocks
self.socket.send_multipart([packed])
if self.response_timeout:
if not self.poller.poll(self.response_timeout * 1000):
raise IOError('Timeout while waiting for server response')
# blocks
rep = self.socket.recv()
return self.check_and_return(msgpack.unpackb(rep, encoding='utf-8'))
def __init__(self, targname, cfg, isServer=False):
self.targname = targname
self.cfg = cfg
self.isServer = isServer
self.fnCallName = ''
self.ctx = zmq.Context()
self.ctx.linger = 100
if not self.isServer:
self.sock = self.ctx.socket(zmq.DEALER)
self.sock.linger = 100
self.sock.connect('tcp://%s:%s' % (self.cfg['server'],self.cfg.get('port',7677))) # this times out with EINVAL when no internet
self.poller = zmq.Poller()
self.poller.register(self.sock, zmq.POLLIN)
else:
self.sock = self.ctx.socket(zmq.ROUTER)
self.sock.linger = 100
self.sock.bind('tcp://*:%s' % (self.cfg.get('port',7677)))
self.poller = zmq.Poller()
self.poller.register(self.sock, zmq.POLLIN)
self.be = GetBackend(self.cfg['backend'])(self.targname, self.cfg)
self.inTime = time.time()
self.inactiveLimit = int(self.cfg.get('inactivelimit',0))
print 'inactivelimit ',self.inactiveLimit
def spin_once(self, polling_sec=0.010):
'''Read the queued data and call the callback for them.
You have to handle KeyboardInterrupt (\C-c) manually.
Example:
>>> def callback(msg):
... print msg
>>> sub = jps.Subscriber('topic_name', callback)
>>> try:
... while True:
... sub.spin_once():
... time.sleep(0.1)
... except KeyboardInterrupt:
... pass
'''
# parse all data
while True:
socks = dict(self._poller.poll(polling_sec * 1000))
if socks.get(self._socket) == zmq.POLLIN:
msg = self._socket.recv()
self._callback(msg)
else:
return
def set_topic(self, name, topic):
"""shortcut to :py:meth:SocketManager.set_socket_option(zmq.TOPIC, topic)
:param name: the name of the socket where data will pad through
:param topic: the option from the ``zmq`` module
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>>
>>> # subscribe only to topics beginning with "logs"
>>> sockets.set_topic('events', 'logs')
>>> event = sockets.recv_event_safe('events')
>>> event.topic, event.data
'logs:2016-06-20', {'stdout': 'hello world'}
"""
safe_topic = bytes(topic)
self.set_socket_option(name, self.zmq.SUBSCRIBE, safe_topic)
def ensure_and_bind(self, socket_name, socket_type, address, polling_mechanism):
"""Ensure that a socket exists, that is *binded* to the given address
and that is registered with the given polling mechanism.
This method is a handy replacement for calling
``.get_or_create()``, ``.bind()`` and then ``.engage()``.
returns the socket itself.
:param socket_name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
"""
self.get_or_create(socket_name, socket_type, polling_mechanism)
socket = self.bind(socket_name, address, polling_mechanism)
self.engage()
return socket
def ready(self, name, polling_mechanism, timeout=None):
"""Polls all sockets and checks if the socket with the given name is ready for either ``zmq.POLLIN`` or ``zmq.POLLOUT``.
returns the socket if available, or ``None``
:param socket_name: the socket name
:param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT``
:param timeout: the polling timeout in miliseconds that will
be passed to ``zmq.Poller().poll()`` (optional, defaults to
``core.DEFAULT_POLLING_TIMEOUT``)
"""
socket = self.get_by_name(name)
available_mechanism = self.engage(timeout is None and self.timeout or timeout).pop(socket, None)
if polling_mechanism == available_mechanism:
return socket
def get_or_create(self, name, socket_type, polling_mechanism):
"""ensure that a socket exists and is registered with a given
polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``)
"""
if name not in self.sockets:
self.create(name, socket_type)
socket = self.get_by_name(name)
self.register_socket(socket, polling_mechanism)
return socket
def _send_raw(self, serialized):
self.create_socket()
self._socket.send_string(serialized, zmq.NOBLOCK)
poller = zmq.Poller()
poller.register(self._socket, zmq.POLLIN)
if poller.poll(self._timeout * 1000):
msg = self._socket.recv()
self.on_message(msg)
self.cleanup_socket()
else:
self._transport.log("Peer " + self._address + " timed out.")
self.cleanup_socket()
self._transport.remove_peer(self._address)
def zmq_request(self, msg_type, msg_content, timeout=__DEFAULT_REQUEST_TIMEOUT):
# new socket to talk to server
self.__socket = zmq.Context().socket(zmq.REQ)
self.__socket.connect("tcp://localhost:" + ZMQPort.RQ)
# init poller and register to socket that web can poll socket to check is it has messages
poller = zmq.Poller()
poller.register(self.__socket, zmq.POLLIN)
send_flatbuf_msg(self.__socket, msg_type, msg_content)
reqs = 0
while reqs * self.__POLL_INTERVAL <= timeout:
socks = dict(poller.poll(self.__POLL_INTERVAL))
if self.__socket in socks and socks[self.__socket] == zmq.POLLIN:
msg = self.__socket.recv()
msgObj = TransMsg.GetRootAsTransMsg(msg, 0)
return msgObj.Content()
reqs = reqs + 1
return False
def register(self, queue, handler, flags=zmq.POLLIN):
"""
Register *queue* to be polled on each cycle of the task. Any messages
with the relevant *flags* (defaults to ``POLLIN``) will trigger the
specified *handler* method which is expected to take a single argument
which will be *queue*.
:param zmq.Socket queue:
The queue to poll.
:param handler:
The function or method to call when a message with matching *flags*
arrives in *queue*.
:param int flags:
The flags to match in the queue poller (defaults to ``POLLIN``).
"""
self.poller.register(queue, flags)
self.handlers[queue] = handler
def watch_queue(self, queue, callback, flags=zmq.POLLIN):
"""
Call *callback* when zmq *queue* has something to read (when *flags* is
set to ``POLLIN``, the default) or is available to write (when *flags*
is set to ``POLLOUT``). No parameters are passed to the callback.
:param queue:
The zmq queue to poll.
:param callback:
The function to call when the poll is successful.
:param int flags:
The condition to monitor on the queue (defaults to ``POLLIN``).
"""
if queue in self._queue_callbacks:
raise ValueError('already watching %r' % queue)
self._poller.register(queue, flags)
self._queue_callbacks[queue] = callback
return queue
def watch_file(self, fd, callback, flags=zmq.POLLIN):
"""
Call *callback* when *fd* has some data to read. No parameters are
passed to the callback. The *flags* are as for :meth:`watch_queue`.
:param fd:
The file-like object, or fileno to monitor.
:param callback:
The function to call when the file has data available.
:param int flags:
The condition to monitor on the file (defaults to ``POLLIN``).
"""
if isinstance(fd, int):
fd = os.fdopen(fd)
self._poller.register(fd, flags)
self._queue_callbacks[fd.fileno()] = callback
return fd
def test_getsockopt_events(self):
sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
eventlet.sleep()
poll_out = zmq.Poller()
poll_out.register(sock1, zmq.POLLOUT)
sock_map = poll_out.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock1.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
sock1.send(b'')
poll_in = zmq.Poller()
poll_in.register(sock2, zmq.POLLIN)
sock_map = poll_in.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock2.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def update_view(self):
while True:
events = dict(self.poller.poll(5))
if not events:
break
for socket in events:
if events[socket] != zmq.POLLIN:
continue
message = socket.recv_pyobj()
timestamp, angles, accel, tmp = message
x_angle = angles[0]
y_angle = angles[1]
z_angle = angles[2]
x_accel = accel[0]
y_accel = accel[1]
z_accel = accel[2]
self.vis_sensors.push_data(timestamp, angles)
self.vis_3d.update_view(x_angle,y_angle,z_angle)
self.beep.beep(x_angle)
self.vis_instrument.update_view(x_accel, y_accel, z_accel)
self.vis_sensors.update_view()
def socket_fitness(self, chrom):
if self.socket.closed:
self.socket = self.context.socket(zmq.REQ)
self.socket.bind(self.socket_port)
self.poll.register(self.socket, zmq.POLLIN)
self.socket.send_string(';'.join([
self.func.get_Driving(),
self.func.get_Follower(),
self.func.get_Link(),
self.func.get_Target(),
self.func.get_ExpressionName(),
self.func.get_Expression(),
','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
','.join([str(e) for e in chrom])
]))
while True:
socks = dict(self.poll.poll(100))
if socks.get(self.socket)==zmq.POLLIN:
return float(self.socket.recv().decode('utf-8'))
else:
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.poll.unregister(self.socket)
return self.func(chrom)
def socket_fitness(self, chrom):
if self.socket.closed:
self.socket = self.context.socket(zmq.REQ)
self.socket.bind(self.socket_port)
self.poll.register(self.socket, zmq.POLLIN)
self.socket.send_string(';'.join([
self.func.get_Driving(),
self.func.get_Follower(),
self.func.get_Link(),
self.func.get_Target(),
self.func.get_ExpressionName(),
self.func.get_Expression(),
','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
','.join([str(e) for e in chrom])
]))
while True:
socks = dict(self.poll.poll(100))
if socks.get(self.socket)==zmq.POLLIN:
return float(self.socket.recv().decode('utf-8'))
else:
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.poll.unregister(self.socket)
return self.func(chrom)