Python zmq 模块,POLLOUT 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.POLLOUT。
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
raise Exception("Datadir %s is not a valid directory" % data_dir)
self.worker_id = binascii.hexlify(os.urandom(8))
self.node_name = socket.gethostname()
self.data_dir = data_dir
self.data_files = set()
context = zmq.Context()
self.socket = context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.identity = self.worker_id
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.redis_server = redis.from_url(redis_url)
self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers
self.check_controllers()
self.last_wrm = 0
self.start_time = time.time()
self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
self.logger.setLevel(loglevel)
self.msg_count = 0
signal.signal(signal.SIGTERM, self.term_signal())
def go(self):
self.logger.info('[#############################>. Starting .<#############################]')
while self.is_running:
try:
time.sleep(0.001)
self.heartbeat()
self.free_dead_workers()
for sock, event in self.poller.poll(timeout=POLLING_TIMEOUT):
if event & zmq.POLLIN:
self.handle_in()
if event & zmq.POLLOUT:
self.handle_out()
self.process_sink_results()
except KeyboardInterrupt:
self.logger.debug('Keyboard Interrupt')
self.kill()
except:
self.logger.error("Exception %s" % traceback.format_exc())
self.logger.info('Stopping')
for x in (os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'),
os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address')):
if os.path.exists(x):
os.remove(x)
def ensure_and_bind(self, socket_name, socket_type, address, polling_mechanism):
"""Ensure that a socket exists, that is *binded* to the given address
and that is registered with the given polling mechanism.
This method is a handy replacement for calling
``.get_or_create()``, ``.bind()`` and then ``.engage()``.
returns the socket itself.
:param socket_name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
"""
self.get_or_create(socket_name, socket_type, polling_mechanism)
socket = self.bind(socket_name, address, polling_mechanism)
self.engage()
return socket
def ready(self, name, polling_mechanism, timeout=None):
"""Polls all sockets and checks if the socket with the given name is ready for either ``zmq.POLLIN`` or ``zmq.POLLOUT``.
returns the socket if available, or ``None``
:param socket_name: the socket name
:param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT``
:param timeout: the polling timeout in miliseconds that will
be passed to ``zmq.Poller().poll()`` (optional, defaults to
``core.DEFAULT_POLLING_TIMEOUT``)
"""
socket = self.get_by_name(name)
available_mechanism = self.engage(timeout is None and self.timeout or timeout).pop(socket, None)
if polling_mechanism == available_mechanism:
return socket
def wait_until_ready(self, name, polling_mechanism, timeout=None, polling_timeout=None):
"""Briefly waits until the socket is ready to be used, yields to other
greenlets until the socket becomes available.
returns the socket if available within the given timeout, or ``None``
:param socket_name: the socket name
:param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT``
:param timeout: the timeout in seconds (accepts float) in which it
should wait for the socket to become available
(optional, defaults to ``core.DEFAULT_TIMEOUT_IN_SECONDS``)
:param polling_timeout: the polling timeout in miliseconds that will
be passed to ``zmq.Poller().poll()``.
(optional, defaults to ``core.DEFAULT_POLLING_TIMEOUT``)
"""
timeout = timeout is None and self.timeout or timeout
polling_timeout = polling_timeout is None and self.polling_timeout or polling_timeout
start = time.time()
current = start
while current - start < timeout:
self.engage(polling_timeout)
socket = self.ready(name, polling_mechanism, timeout=timeout)
current = time.time()
if socket:
return socket
def get_or_create(self, name, socket_type, polling_mechanism):
"""ensure that a socket exists and is registered with a given
polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``)
"""
if name not in self.sockets:
self.create(name, socket_type)
socket = self.get_by_name(name)
self.register_socket(socket, polling_mechanism)
return socket
def get_log_handler(self, socket_name, topic_name='logs'):
"""returns an instance of :py:class:ZMQPubHandler attached to a previously-created socket.
:param socket_name: the name of the socket, previously created with :py:meth:SocketManager.create
:param topic_name: the name of the topic in which the logs will be PUBlished
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('logs', zmq.PUB, 'tcp://*:6000', zmq.POLLOUT)
>>> app_logger = sockets.get_logger('logs', logger_name='myapp'))
>>> app_logger.info("Server is up!")
>>> try:
url = sockets.recv_safe('download_queue')
... requests.get(url)
... except:
... app_logger.exception('failed to download url: %s', url)
"""
return ZMQPubHandler(self, socket_name, topic_name)
def test_getsockopt_events(self):
sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
eventlet.sleep()
poll_out = zmq.Poller()
poll_out.register(sock1, zmq.POLLOUT)
sock_map = poll_out.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock1.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
sock1.send(b'')
poll_in = zmq.Poller()
poll_in.register(sock2, zmq.POLLIN)
sock_map = poll_in.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock2.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def test_pubsub(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'')
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN)
# Now make sure that both are send ready.
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(s2 in socks, 0)
# Make sure that s1 stays in POLLOUT after a send.
s1.send(b'msg1')
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
# Make sure that s2 is POLLIN after waiting.
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s2], zmq.POLLIN)
# Make sure that s2 goes into 0 after recv.
s2.recv()
socks = dict(poller.poll())
self.assertEqual(s2 in socks, 0)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def open_connection(self, address, status_port, data_port):
self.statusBar().showMessage("Open session to {}:{}".format(address, status_port), 2000)
socket = self.context.socket(zmq.DEALER)
socket.identity = "Matplotlib_Qt_Client".encode()
socket.connect("tcp://{}:{}".format(address, status_port))
socket.send(b"WHATSUP")
poller = zmq.Poller()
poller.register(socket, zmq.POLLOUT)
time.sleep(0.1)
evts = dict(poller.poll(100))
if socket in evts:
try:
reply, desc = [e.decode() for e in socket.recv_multipart()]
desc = json.loads(desc)
self.statusBar().showMessage("Connection established. Pulling plot information.", 2000)
except:
self.statusBar().showMessage("Could not connect to server.", 2000)
return
else:
self.statusBar().showMessage("Server did not respond.", 2000)
socket.close()
self.construct_plots(desc)
# Actual data listener
if self.listener_thread:
self.Datalistener.running = False
self.listener_thread.quit()
self.listener_thread.wait()
self.listener_thread = QtCore.QThread()
self.Datalistener = DataListener(address, data_port)
self.Datalistener.moveToThread(self.listener_thread)
self.listener_thread.started.connect(self.Datalistener.loop)
self.Datalistener.message.connect(self.data_signal_received)
self.Datalistener.finished.connect(self.stop_listening)
QtCore.QTimer.singleShot(0, self.listener_thread.start)
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO):
self.redis_url = redis_url
self.redis_server = redis.from_url(redis_url)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes
self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.node_name = socket.gethostname()
self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399,
max_tries=100)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F:
F.write(self.address)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F:
F.write(str(os.getpid()))
self.logger = bqueryd.logger.getChild('controller').getChild(self.address)
self.logger.setLevel(loglevel)
self.msg_count_in = 0
self.rpc_results = [] # buffer of results that are ready to be returned to callers
self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs
self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones...
self.files_map = {} # shows on which workers a file is available on
self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity
self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages
self.is_running = True
self.last_heartbeat = 0
self.others = {} # A dict of other Controllers running on other DQE nodes
self.start_time = time.time()
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def test_pubsub(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'')
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN)
# Now make sure that both are send ready.
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(s2 in socks, 0)
# Make sure that s1 stays in POLLOUT after a send.
s1.send(b'msg1')
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
# Make sure that s2 is POLLIN after waiting.
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s2], zmq.POLLIN)
# Make sure that s2 goes into 0 after recv.
s2.recv()
socks = dict(poller.poll())
self.assertEqual(s2 in socks, 0)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_pair(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
# Poll result should contain both sockets
socks = dict(poller.poll())
# Now make sure that both are send ready.
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
s1.send(b'msg1')
s2.send(b'msg2')
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
# Make sure that both are in POLLOUT after recv.
s1.recv()
s2.recv()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def test_pubsub(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'')
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN)
# Now make sure that both are send ready.
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(s2 in socks, 0)
# Make sure that s1 stays in POLLOUT after a send.
s1.send(b'msg1')
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
# Make sure that s2 is POLLIN after waiting.
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s2], zmq.POLLIN)
# Make sure that s2 goes into 0 after recv.
s2.recv()
socks = dict(poller.poll())
self.assertEqual(s2 in socks, 0)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_pair(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
# Poll result should contain both sockets
socks = dict(poller.poll())
# Now make sure that both are send ready.
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
s1.send(b'msg1')
s2.send(b'msg2')
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
# Make sure that both are in POLLOUT after recv.
s1.recv()
s2.recv()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def test_pubsub(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'')
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN)
# Now make sure that both are send ready.
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(s2 in socks, 0)
# Make sure that s1 stays in POLLOUT after a send.
s1.send(b'msg1')
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
# Make sure that s2 is POLLIN after waiting.
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s2], zmq.POLLIN)
# Make sure that s2 goes into 0 after recv.
s2.recv()
socks = dict(poller.poll())
self.assertEqual(s2 in socks, 0)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_pair(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
# Poll result should contain both sockets
socks = dict(poller.poll())
# Now make sure that both are send ready.
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
s1.send(b'msg1')
s2.send(b'msg2')
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
# Make sure that both are in POLLOUT after recv.
s1.recv()
s2.recv()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def send_safe(self, name, data, *args, **kw):
"""serializes the data with the configured ``serialization_backend``,
waits for the socket to become available, then sends it over
through the provided socket name.
returns ``True`` if the message was sent, or ``False`` if the
socket never became available.
.. note::
you can safely use this function without waiting for a
socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
:param name: the name of the socket where data should be sent through
:param data: the data to be serialized then sent
:param ``*args``: args to be passed to wait_until_ready
:param ``**kw``: kwargs to be passed to wait_until_ready
"""
socket = self.wait_until_ready(name, self.zmq.POLLOUT, *args, **kw)
if not socket:
return False
payload = self.serialization_backend.pack(data)
socket.send(payload)
return True