Python zmq 模块,Socket() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.Socket()。
def poll(self, timeout=None):
"""Poll the registered 0MQ or native fds for I/O.
Parameters
----------
timeout : float, int
The timeout in milliseconds. If None, no `timeout` (infinite). This
is in milliseconds to be compatible with ``select.poll()``.
Returns
-------
events : list of tuples
The list of events that are ready to be processed.
This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket
or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
It is common to call ``events = dict(poller.poll())``,
which turns the list of tuples into a mapping of ``socket : event``.
"""
if timeout is None or timeout < 0:
timeout = -1
elif isinstance(timeout, float):
timeout = int(timeout)
return zmq_poll(self.sockets, timeout=timeout)
def main():
context = zmq.Context()
socket = zmq.Socket(context, zmq.SUB)
monitor = socket.get_monitor_socket()
socket.connect(ipc_sub_url)
while True:
status = recv_monitor_message(monitor)
if status['event'] == zmq.EVENT_CONNECTED:
break
elif status['event'] == zmq.EVENT_CONNECT_DELAYED:
pass
print('connected')
socket.subscribe('pupil')
while True:
topic = socket.recv_string()
payload = serializer.loads(socket.recv(), encoding='utf-8')
print(topic, payload)
def __init__(self, ctx, url, topics=(), block_until_connected=True):
self.socket = zmq.Socket(ctx, zmq.SUB)
assert type(topics) != str
if block_until_connected:
# connect node and block until a connecetion has been made
monitor = self.socket.get_monitor_socket()
self.socket.connect(url)
while True:
status = recv_monitor_message(monitor)
if status['event'] == zmq.EVENT_CONNECTED:
break
elif status['event'] == zmq.EVENT_CONNECT_DELAYED:
pass
else:
raise Exception("ZMQ connection failed")
self.socket.disable_monitor()
else:
self.socket.connect(url)
for t in topics:
self.subscribe(t)
def register(self, socket, address, alias=None, handler=None):
assert not self.registered(address), \
'Socket is already registered!'
if not alias:
alias = address
self.socket[alias] = socket
self.socket[address] = socket
self.socket[socket] = socket
self.address[alias] = address
self.address[socket] = address
self.address[address] = address
if handler is not None:
self.poller.register(socket, zmq.POLLIN)
if address.kind in ('SUB', 'SYNC_SUB'):
self.subscribe(socket, handler)
else:
self._set_handler(socket, handler)
def _set_handler(self, socket, handler, update=False):
"""
Set the socket handler(s).
Parameters
----------
socket : zmq.Socket
Socket to set its handler(s).
handler : function(s)
Handler(s) for the socket. This can be a list or a dictionary too.
"""
if update:
try:
self.handler[socket].update(self._curated_handlers(handler))
except KeyError:
self.handler[socket] = self._curated_handlers(handler)
else:
self.handler[socket] = self._curated_handlers(handler)
def _process_single_event(self, socket):
"""
Process a socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
"""
data = socket.recv()
address = self.address[socket]
if address.kind == 'SUB':
self._process_sub_event(socket, address, data)
elif address.kind == 'PULL':
self._process_pull_event(socket, address, data)
elif address.kind == 'REP':
self._process_rep_event(socket, address, data)
else:
self._process_single_event_complex(address, socket, data)
def _process_single_event_complex(self, address, socket, data):
"""
Process a socket's event for complex sockets (channels).
Parameters
----------
address : AgentAddress or AgentChannel
Agent address or channel associated to the socket.
socket : zmq.Socket
Socket that generated the event.
data
Received in the socket.
"""
if address.kind == 'ASYNC_REP':
self._process_async_rep_event(socket, address, data)
elif address.kind == 'PULL_SYNC_PUB':
self._process_sync_pub_event(socket, address.channel, data)
else:
raise NotImplementedError('Unsupported kind %s!' % address.kind)
def _process_pull_event(self, socket, addr, data):
"""
Process a PULL socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
addr : AgentAddress
AgentAddress associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
message = deserialize_message(message=data, serializer=addr.serializer)
handler = self.handler[socket]
if not isinstance(handler, (list, dict, tuple)):
handler = [handler]
for h in handler:
h(self, message)
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def poll(self, timeout=None):
"""Poll the registered 0MQ or native fds for I/O.
Parameters
----------
timeout : float, int
The timeout in milliseconds. If None, no `timeout` (infinite). This
is in milliseconds to be compatible with ``select.poll()``. The
underlying zmq_poll uses microseconds and we convert to that in
this function.
Returns
-------
events : list of tuples
The list of events that are ready to be processed.
This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket
or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
It is common to call ``events = dict(poller.poll())``,
which turns the list of tuples into a mapping of ``socket : event``.
"""
if timeout is None or timeout < 0:
timeout = -1
elif isinstance(timeout, float):
timeout = int(timeout)
return zmq_poll(self.sockets, timeout=timeout)
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def poll(self, timeout=None):
"""Poll the registered 0MQ or native fds for I/O.
Parameters
----------
timeout : float, int
The timeout in milliseconds. If None, no `timeout` (infinite). This
is in milliseconds to be compatible with ``select.poll()``. The
underlying zmq_poll uses microseconds and we convert to that in
this function.
Returns
-------
events : list of tuples
The list of events that are ready to be processed.
This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket
or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
It is common to call ``events = dict(poller.poll())``,
which turns the list of tuples into a mapping of ``socket : event``.
"""
if timeout is None or timeout < 0:
timeout = -1
elif isinstance(timeout, float):
timeout = int(timeout)
return zmq_poll(self.sockets, timeout=timeout)
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def poll(self, timeout=None):
"""Poll the registered 0MQ or native fds for I/O.
Parameters
----------
timeout : float, int
The timeout in milliseconds. If None, no `timeout` (infinite). This
is in milliseconds to be compatible with ``select.poll()``. The
underlying zmq_poll uses microseconds and we convert to that in
this function.
Returns
-------
events : list of tuples
The list of events that are ready to be processed.
This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket
or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
It is common to call ``events = dict(poller.poll())``,
which turns the list of tuples into a mapping of ``socket : event``.
"""
if timeout is None or timeout < 0:
timeout = -1
elif isinstance(timeout, float):
timeout = int(timeout)
return zmq_poll(self.sockets, timeout=timeout)
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def poll(self, timeout=None):
"""Poll the registered 0MQ or native fds for I/O.
Parameters
----------
timeout : float, int
The timeout in milliseconds. If None, no `timeout` (infinite). This
is in milliseconds to be compatible with ``select.poll()``. The
underlying zmq_poll uses microseconds and we convert to that in
this function.
Returns
-------
events : list of tuples
The list of events that are ready to be processed.
This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket
or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
It is common to call ``events = dict(poller.poll())``,
which turns the list of tuples into a mapping of ``socket : event``.
"""
if timeout is None or timeout < 0:
timeout = -1
elif isinstance(timeout, float):
timeout = int(timeout)
return zmq_poll(self.sockets, timeout=timeout)
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def register(self, queue, handler, flags=zmq.POLLIN):
"""
Register *queue* to be polled on each cycle of the task. Any messages
with the relevant *flags* (defaults to ``POLLIN``) will trigger the
specified *handler* method which is expected to take a single argument
which will be *queue*.
:param zmq.Socket queue:
The queue to poll.
:param handler:
The function or method to call when a message with matching *flags*
arrives in *queue*.
:param int flags:
The flags to match in the queue poller (defaults to ``POLLIN``).
"""
self.poller.register(queue, flags)
self.handlers[queue] = handler
def dump(msg_or_socket):
"""Receives all message parts from socket, printing each frame neatly"""
if isinstance(msg_or_socket, zmq.Socket):
# it's a socket, call on current message
msg = msg_or_socket.recv_multipart()
else:
msg = msg_or_socket
print("----------------------------------------")
for part in msg:
print("[%03d]" % len(part), end=' ')
is_text = True
try:
print(part.decode('ascii'))
except UnicodeDecodeError:
print(r"0x%s" % (binascii.hexlify(part).decode('ascii')))
def __init__(self, interface_or_socket, context=None):
logging.Handler.__init__(self)
if isinstance(interface_or_socket, zmq.Socket):
self.socket = interface_or_socket
self.ctx = self.socket.context
else:
self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(interface_or_socket)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def test_subclass(self):
"""subclasses can assign attributes"""
class S(zmq.Socket):
a = None
def __init__(self, *a, **kw):
self.a=-1
super(S, self).__init__(*a, **kw)
s = S(self.context, zmq.REP)
self.sockets.append(s)
self.assertEqual(s.a, -1)
s.a=1
self.assertEqual(s.a, 1)
a=s.a
self.assertEqual(a, 1)
def test_shadow(self):
p = self.socket(zmq.PUSH)
p.bind("tcp://127.0.0.1:5555")
p2 = zmq.Socket.shadow(p.underlying)
self.assertEqual(p.underlying, p2.underlying)
s = self.socket(zmq.PULL)
s2 = zmq.Socket.shadow(s.underlying)
self.assertNotEqual(s.underlying, p.underlying)
self.assertEqual(s.underlying, s2.underlying)
s2.connect("tcp://127.0.0.1:5555")
sent = b'hi'
p2.send(sent)
rcvd = self.recv(s2)
self.assertEqual(rcvd, sent)
def register(self, socket, flags=POLLIN|POLLOUT):
"""p.register(socket, flags=POLLIN|POLLOUT)
Register a 0MQ socket or native fd for I/O monitoring.
register(s,0) is equivalent to unregister(s).
Parameters
----------
socket : zmq.Socket or native socket
A zmq.Socket or any Python object having a ``fileno()``
method that returns a valid file descriptor.
flags : int
The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
If `flags=0`, socket will be unregistered.
"""
if flags:
if socket in self._map:
idx = self._map[socket]
self.sockets[idx] = (socket, flags)
else:
idx = len(self.sockets)
self.sockets.append((socket, flags))
self._map[socket] = idx
elif socket in self._map:
# uregister sockets registered with no events
self.unregister(socket)
else:
# ignore new sockets with no events
pass
def unregister(self, socket):
"""Remove a 0MQ socket or native fd for I/O monitoring.
Parameters
----------
socket : Socket
The socket instance to stop polling.
"""
idx = self._map.pop(socket)
self.sockets.pop(idx)
# shift indices after deletion
for socket, flags in self.sockets[idx:]:
self._map[socket] -= 1
def __init__(self, context, socket_type, io_loop=None):
super(_AsyncSocket, self).__init__(context, socket_type)
self.io_loop = io_loop or self._default_loop()
self._recv_futures = []
self._send_futures = []
self._state = 0
self._shadow_sock = _zmq.Socket.shadow(self.underlying)
self._init_io_state()
def _socket_class(self, socket_type):
return Socket(self, socket_type, io_loop=self.io_loop)
def __init__(self):
pupil_queue = Queue()
self.pupil_proc = Process(target=pupil_capture.alternate_launch,
args=((pupil_queue), ))
self.pupil_proc.start()
while True:
pupil_msg = pupil_queue.get()
print(pupil_msg)
if 'tcp' in pupil_msg:
self.ipc_sub_url = pupil_msg
if 'EYE_READY' in pupil_msg:
break
context = zmq.Context()
self.socket = zmq.Socket(context, zmq.SUB)
monitor = self.socket.get_monitor_socket()
self.socket.connect(self.ipc_sub_url)
while True:
status = recv_monitor_message(monitor)
if status['event'] == zmq.EVENT_CONNECTED:
break
elif status['event'] == zmq.EVENT_CONNECT_DELAYED:
pass
print('Capturing from pupil on url %s.' % self.ipc_sub_url)
self.socket.subscribe('pupil')
# setup LSL
streams = resolve_byprop('name', LSL_STREAM_NAME, timeout=2.5)
try:
self.inlet = StreamInlet(streams[0])
except IndexError:
raise ValueError('Make sure stream name="%s", is opened first.'
% LSL_STREAM_NAME)
self.running = True
self.samples = []
# LSL and pupil samples are synchronized to local_clock(), which is the
# runtime on this slave, not the host
def thread_loop(self, context, pipe):
poller = zmq.Poller()
ipc_pub = zmq_tools.Msg_Dispatcher(context, self.g_pool.ipc_push_url)
poller.register(pipe, zmq.POLLIN)
remote_socket = None
while True:
items = dict(poller.poll())
if pipe in items:
cmd = pipe.recv_string()
if cmd == 'Exit':
break
elif cmd == 'Bind':
new_url = pipe.recv_string()
if remote_socket:
poller.unregister(remote_socket)
remote_socket.close(linger=0)
try:
remote_socket = context.socket(zmq.REP)
remote_socket.bind(new_url)
except zmq.ZMQError as e:
remote_socket = None
pipe.send_string("Error", flags=zmq.SNDMORE)
pipe.send_string("Could not bind to Socket: {}. Reason: {}".format(new_url, e))
else:
pipe.send_string("Bind OK", flags=zmq.SNDMORE)
# `.last_endpoint` is already of type `bytes`
pipe.send(remote_socket.last_endpoint.replace(b"tcp://", b""))
poller.register(remote_socket, zmq.POLLIN)
if remote_socket in items:
self.on_recv(remote_socket, ipc_pub)
self.thread_pipe = None
def __init__(self, ctx, url):
self.socket = zmq.Socket(ctx, zmq.PUB)
self.socket.connect(url)
def _bind_socket(self, socket, addr=None, transport=None):
"""
Bind a socket using the corresponding transport and address.
Parameters
----------
socket : zmq.Socket
Socket to bind.
addr : str, default is None
The address to bind to.
transport : str, AgentAddressTransport, default is None
Transport protocol.
Returns
-------
addr : str
The address where the socket binded to.
"""
if transport == 'tcp':
host, port = address_to_host_port(addr)
if not port:
uri = 'tcp://%s' % self.host
port = socket.bind_to_random_port(uri)
addr = self.host + ':' + str(port)
else:
socket.bind('tcp://%s' % (addr))
else:
if not addr:
addr = str(unique_identifier())
if transport == 'ipc':
addr = config['IPC_DIR'] / addr
socket.bind('%s://%s' % (transport, addr))
return addr
def _process_async_rep_event(self, socket, channel, data):
"""
Process a ASYNC_REP socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
channel : AgentChannel
AgentChannel associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
message = deserialize_message(message=data,
serializer=channel.serializer)
address_uuid, request_uuid, data, address = message
client_address = address.twin()
if not self.registered(client_address):
self.connect(address)
handler = self.handler[socket]
is_generator = inspect.isgeneratorfunction(handler)
if is_generator:
generator = handler(self, data)
reply = next(generator)
else:
reply = handler(self, data)
self.send(client_address, (address_uuid, request_uuid, reply))
if is_generator:
execute_code_after_yield(generator)
def _process_sync_pub_event(self, socket, channel, data):
"""
Process a SYNC_PUB socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
channel : AgentChannel
AgentChannel associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
message = deserialize_message(message=data,
serializer=channel.serializer)
address_uuid, request_uuid, data = message
handler = self.handler[socket]
is_generator = inspect.isgeneratorfunction(handler)
if is_generator:
generator = handler(self, data)
reply = next(generator)
else:
reply = handler(self, data)
message = (address_uuid, request_uuid, reply)
self._send_channel_sync_pub(channel=channel,
message=message,
topic=address_uuid,
general=False)
if is_generator:
execute_code_after_yield(generator)
def _process_sub_event(self, socket, addr, data):
"""
Process a SUB socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
addr : AgentAddress
AgentAddress associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
handlers = self.handler[socket]
message = self._process_sub_message(addr.serializer, data)
for topic in handlers:
if not data.startswith(topic):
continue
# Call the handler (with or without the topic)
handler = handlers[topic]
nparams = len(inspect.signature(handler).parameters)
if nparams == 2:
handler(self, message)
elif nparams == 3:
handler(self, message, topic)
def get_unique_external_zmq_sockets(self):
"""
Return an iterable containing all the zmq.Socket objects from
`self.socket` which are not internal, without repetition.
Originally, a socket was internal if its alias was one of the
following:
- loopback
- _loopback_safe
- inproc://loopback
- inproc://_loopback_safe
However, since we are storing more than one entry in the `self.socket`
dictionary per zmq.socket (by storing its AgentAddress, for example),
we need a way to simply get all non-internal zmq.socket objects, and
this is precisely what this function does.
"""
reserved = ('loopback', '_loopback_safe', 'inproc://loopback',
'inproc://_loopback_safe')
external_sockets = []
for k, v in self.socket.items():
if isinstance(k, zmq.sugar.socket.Socket):
continue
if isinstance(k, AgentAddress) and k.address in reserved:
continue
if k in reserved:
continue
external_sockets.append(v)
return set(external_sockets)
def recv_messages(zmq_subscriber, timeout_count, message_count):
"""Test utility function.
Subscriber thread that receives and counts ZMQ messages.
Args:
zmq_subscriber (zmq.Socket): ZMQ subscriber socket.
timeout_count (int): No. of failed receives until exit.
message_count (int): No. of messages expected to be received.
Returns:
(int) Number of messages received.
"""
# pylint: disable=E1101
fails = 0 # No. of receives that didn't return a message.
receive_count = 0 # Total number of messages received.
while fails < timeout_count:
try:
_ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK)
fails = 0
receive_count += 1
if receive_count == message_count:
break
except zmq.ZMQError as error:
if error.errno == zmq.EAGAIN:
pass
else:
raise
fails += 1
time.sleep(1e-6)
return receive_count
def __init__(self, *args, **kwargs):
zmq.Socket.__init__(self, *args, **kwargs)
#
# Keep track of which thread this socket was created in
#
self.__dict__['_thread'] = threading.current_thread()
def __init__(self, interface_or_socket, context=None):
logging.Handler.__init__(self)
if isinstance(interface_or_socket, zmq.Socket):
self.socket = interface_or_socket
self.ctx = self.socket.context
else:
self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(interface_or_socket)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def test_subclass(self):
"""subclasses can assign attributes"""
class S(zmq.Socket):
a = None
def __init__(self, *a, **kw):
self.a=-1
super(S, self).__init__(*a, **kw)
s = S(self.context, zmq.REP)
self.sockets.append(s)
self.assertEqual(s.a, -1)
s.a=1
self.assertEqual(s.a, 1)
a=s.a
self.assertEqual(a, 1)
def test_shadow(self):
p = self.socket(zmq.PUSH)
p.bind("tcp://127.0.0.1:5555")
p2 = zmq.Socket.shadow(p.underlying)
self.assertEqual(p.underlying, p2.underlying)
s = self.socket(zmq.PULL)
s2 = zmq.Socket.shadow(s.underlying)
self.assertNotEqual(s.underlying, p.underlying)
self.assertEqual(s.underlying, s2.underlying)
s2.connect("tcp://127.0.0.1:5555")
sent = b'hi'
p2.send(sent)
rcvd = self.recv(s2)
self.assertEqual(rcvd, sent)
def unregister(self, socket):
"""Remove a 0MQ socket or native fd for I/O monitoring.
Parameters
----------
socket : Socket
The socket instance to stop polling.
"""
idx = self._map.pop(socket)
self.sockets.pop(idx)
# shift indices after deletion
for socket, flags in self.sockets[idx:]:
self._map[socket] -= 1
def __init__(self, interface_or_socket, context=None):
logging.Handler.__init__(self)
if isinstance(interface_or_socket, zmq.Socket):
self.socket = interface_or_socket
self.ctx = self.socket.context
else:
self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(interface_or_socket)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def test_subclass(self):
"""subclasses can assign attributes"""
class S(zmq.Socket):
a = None
def __init__(self, *a, **kw):
self.a=-1
super(S, self).__init__(*a, **kw)
s = S(self.context, zmq.REP)
self.sockets.append(s)
self.assertEqual(s.a, -1)
s.a=1
self.assertEqual(s.a, 1)
a=s.a
self.assertEqual(a, 1)
def register(self, socket, flags=POLLIN|POLLOUT):
"""p.register(socket, flags=POLLIN|POLLOUT)
Register a 0MQ socket or native fd for I/O monitoring.
register(s,0) is equivalent to unregister(s).
Parameters
----------
socket : zmq.Socket or native socket
A zmq.Socket or any Python object having a ``fileno()``
method that returns a valid file descriptor.
flags : int
The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
If `flags=0`, socket will be unregistered.
"""
if flags:
if socket in self._map:
idx = self._map[socket]
self.sockets[idx] = (socket, flags)
else:
idx = len(self.sockets)
self.sockets.append((socket, flags))
self._map[socket] = idx
elif socket in self._map:
# uregister sockets registered with no events
self.unregister(socket)
else:
# ignore new sockets with no events
pass
def unregister(self, socket):
"""Remove a 0MQ socket or native fd for I/O monitoring.
Parameters
----------
socket : Socket
The socket instance to stop polling.
"""
idx = self._map.pop(socket)
self.sockets.pop(idx)
# shift indices after deletion
for socket, flags in self.sockets[idx:]:
self._map[socket] -= 1
def __init__(self, interface_or_socket, context=None):
logging.Handler.__init__(self)
if isinstance(interface_or_socket, zmq.Socket):
self.socket = interface_or_socket
self.ctx = self.socket.context
else:
self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(interface_or_socket)
def test_subclass(self):
"""subclasses can assign attributes"""
class S(zmq.Socket):
a = None
def __init__(self, *a, **kw):
self.a=-1
super(S, self).__init__(*a, **kw)
s = S(self.context, zmq.REP)
self.sockets.append(s)
self.assertEqual(s.a, -1)
s.a=1
self.assertEqual(s.a, 1)
a=s.a
self.assertEqual(a, 1)