Python zmq 模块,PULL 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.PULL。
def __init__(self, opts=None):
if opts is None:
self.opts = self.process_config(CONFIG_LOCATION)
else:
self.opts = opts
self.ctx = zmq.Context()
self.pub_socket = self.ctx.socket(zmq.PUB)
self.pub_socket.bind('tcp://127.0.0.1:2000')
self.loop = zmq.eventloop.IOLoop.instance()
self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop)
# Now create PULL socket over IPC to listen to reactor
self.pull_socket = self.ctx.socket(zmq.PULL)
self.pull_socket.bind('ipc:///tmp/reactor.ipc')
self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)
self.pull_stream.on_recv(self.republish)
def receive_message(self, event, event_data, listener_data):
"""
Receives a messages from another processes.
:param * event: Not used.
:param * event_data: Not used.
:param * listener_data: Not used.
"""
del event, event_data, listener_data
# Make a poller for all incoming sockets.
poller = zmq.Poller()
for socket in self.__end_points.values():
if socket.type in [zmq.PULL, zmq.REP]:
poller.register(socket, zmq.POLLIN)
# Wait for socket is ready for reading.
socks = dict(poller.poll())
for name, socket in self.__end_points.items():
if socket in socks:
self._receive_message(name, socket)
# ------------------------------------------------------------------------------------------------------------------
def __register_sockets(self):
"""
Registers ZMQ sockets for communication with other processes in Enarksh.
"""
config = Config.get()
# Register socket for receiving asynchronous incoming messages.
self.__message_controller.register_end_point('pull', zmq.PULL, config.get_spawner_pull_end_point())
# Register socket for sending asynchronous messages to the controller.
self.__message_controller.register_end_point('controller', zmq.PUSH, config.get_controller_pull_end_point())
# Register socket for sending asynchronous messages to the logger.
self.__message_controller.register_end_point('logger', zmq.PUSH, config.get_logger_pull_end_point())
# ------------------------------------------------------------------------------------------------------------------
def __register_sockets(self):
"""
Registers ZMQ sockets for communication with other processes in Enarksh.
"""
config = Config.get()
# Register socket for receiving asynchronous incoming messages.
self.message_controller.register_end_point('pull', zmq.PULL, config.get_controller_pull_end_point())
# Create socket for lockstep incoming messages.
self.message_controller.register_end_point('lockstep', zmq.REP, config.get_controller_lockstep_end_point())
# Create socket for sending asynchronous messages to the spanner.
self.message_controller.register_end_point('spawner', zmq.PUSH, config.get_spawner_pull_end_point())
# Create socket for sending asynchronous messages to the logger.
self.message_controller.register_end_point('logger', zmq.PUSH, config.get_logger_pull_end_point())
# ------------------------------------------------------------------------------------------------------------------
def register_end_point(self, name, socket_type, end_point):
"""
Registers an end point.
:param str name: The name of the end point.
:param int socket_type: The socket type, one of
- zmq.PULL for asynchronous incoming messages
- zmq.REP for lockstep incoming messages
- zmq.PUSH for asynchronous outgoing messages
:param str end_point: The end point.
"""
socket = self.__zmq_context.socket(socket_type)
self.__end_points[name] = socket
if socket_type in [zmq.PULL, zmq.REP]:
socket.bind(end_point)
elif socket_type == zmq.PUSH:
socket.connect(end_point)
else:
raise ValueError("Unknown socket type {0}".format(socket_type))
# ------------------------------------------------------------------------------------------------------------------
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
# Travis can't handle how much memory PyPy uses on this test
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_poll(self):
@gen.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.poll(timeout=0)
self.assertEqual(f.result(), 0)
f = b.poll(timeout=1)
assert not f.done()
evt = yield f
self.assertEqual(evt, 0)
f = b.poll(timeout=1000)
assert not f.done()
yield a.send_multipart([b'hi', b'there'])
evt = yield f
self.assertEqual(evt, zmq.POLLIN)
recvd = yield b.recv_multipart()
self.assertEqual(recvd, [b'hi', b'there'])
self.loop.run_sync(test)
def test_poll(self):
@asyncio.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.poll(timeout=0)
yield from asyncio.sleep(0)
self.assertEqual(f.result(), 0)
f = b.poll(timeout=1)
assert not f.done()
evt = yield from f
self.assertEqual(evt, 0)
f = b.poll(timeout=1000)
assert not f.done()
yield from a.send_multipart([b'hi', b'there'])
evt = yield from f
self.assertEqual(evt, zmq.POLLIN)
recvd = yield from b.recv_multipart()
self.assertEqual(recvd, [b'hi', b'there'])
self.loop.run_until_complete(test())
def _process_single_event(self, socket):
"""
Process a socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
"""
data = socket.recv()
address = self.address[socket]
if address.kind == 'SUB':
self._process_sub_event(socket, address, data)
elif address.kind == 'PULL':
self._process_pull_event(socket, address, data)
elif address.kind == 'REP':
self._process_rep_event(socket, address, data)
else:
self._process_single_event_complex(address, socket, data)
def _process_pull_event(self, socket, addr, data):
"""
Process a PULL socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
addr : AgentAddress
AgentAddress associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
message = deserialize_message(message=data, serializer=addr.serializer)
handler = self.handler[socket]
if not isinstance(handler, (list, dict, tuple)):
handler = [handler]
for h in handler:
h(self, message)
def test_agentchannel_async_rep():
"""
Test basic ASYNC_REP AgentChannel operations: initialization, equivalence
and basic methods.
"""
receiver = AgentAddress('ipc', 'addr0', 'PULL', 'server', 'pickle')
channel = AgentChannel('ASYNC_REP', receiver=receiver, sender=None)
# Equivalence
assert channel == AgentChannel('ASYNC_REP', receiver=receiver, sender=None)
assert not channel == 'foo'
assert channel != 'foo'
# Basic methods
assert channel.twin() == AgentChannel('ASYNC_REQ', sender=receiver.twin(),
receiver=None)
# Other attributes
assert hasattr(channel, 'uuid')
assert channel.transport == 'ipc'
assert channel.serializer == 'pickle'
def test_agentchannel_sync_pub():
"""
Test basic SYNC_PUB AgentChannel operations: initialization, equivalence
and basic methods.
"""
sender = AgentAddress('ipc', 'addr0', 'PUB', 'server', 'pickle')
receiver = AgentAddress('ipc', 'addr0', 'PULL', 'server', 'pickle')
channel = AgentChannel('SYNC_PUB', sender=sender, receiver=receiver)
# Equivalence
assert channel == AgentChannel('SYNC_PUB', sender=sender,
receiver=receiver)
assert not channel == 'foo'
assert channel != 'foo'
# Basic methods
assert channel.twin() == AgentChannel('SYNC_SUB', sender=receiver.twin(),
receiver=sender.twin())
# Other attributes
assert hasattr(channel, 'uuid')
assert channel.transport == 'ipc'
assert channel.serializer == 'pickle'
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.PULL)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def router_main(_, pidx, args):
log = get_logger('examples.zmqserver.extra', pidx)
ctx = zmq.Context()
ctx.linger = 0
in_sock = ctx.socket(zmq.PULL)
in_sock.bind('tcp://*:5000')
out_sock = ctx.socket(zmq.PUSH)
out_sock.bind('ipc://example-events')
try:
log.info('router proxy started')
zmq.proxy(in_sock, out_sock)
except KeyboardInterrupt:
pass
except:
log.exception('unexpected error')
finally:
log.info('router proxy terminated')
in_sock.close()
out_sock.close()
ctx.term()
def worker_main(loop, pidx, args):
log = get_logger('examples.zmqserver.worker', pidx)
router = await aiozmq.create_zmq_stream(
zmq.PULL,
connect='ipc://example-events')
async def process_incoming(router):
while True:
try:
data = await router.read()
except aiozmq.ZmqStreamClosed:
break
log.info(data)
task = loop.create_task(process_incoming(router))
log.info('started')
yield
router.close()
await task
log.info('terminated')
def reset(self):
self.status = READY
context = zmq.Context()
self._socket1 = context.socket(zmq.PUSH)
self._socket1.bind(self._address1)
self._socket1.set_hwm(32)
self._socket2 = context.socket(zmq.PULL)
self._socket2.set_hwm(32)
self._socket2.RCVTIMEO = 1
self._socket2.bind(self._address2)
self._prev_drained = False
self._sub_drained = False
self._conn1_send_count = 0
self._conn1_recv_count = {}
self._conn2_send_count = {}
self._conn2_recv_count = 0
self._retry_count = 0
def reset(self):
self.status = READY
context = zmq.Context()
self._socket = context.socket(zmq.PULL)
self._socket.RCVTIMEO = 1
sync_socket = context.socket(zmq.PUSH)
while self._ports['conn1'] is None or self._ports['sync_conn1'] is None:
sleep(0.01)
# Handshake with main process
self._socket.connect(self._address + ':' + str(self._ports['conn1']))
sync_socket.connect(self._address + ':' + str(self._ports['sync_conn1']))
packet = msgpack.dumps(b'SYNC')
sync_socket.send(packet)
sync_socket.close()
self._num_recv = 0
self._drained = False
def zmq_streamer():
try:
context = zmq.Context()
# Socket facing clients
frontend = context.socket(zmq.PUSH)
frontend.bind("tcp://*:%s" % (zmq_queue_port_push))
# Socket facing services
backend = context.socket(zmq.PULL)
backend.bind("tcp://*:%s" % (zmq_queue_port_pull))
zmq.device(zmq.STREAMER, frontend, backend)
except Exception as e:
print(e)
print("bringing down zmq device")
finally:
frontend.close()
backend.close()
context.term()
def generator_from_zmq_pull(context, host):
socket = context.socket(zmq.PULL)
# TODO: Configure socket with clean properties to avoid message overload.
if host.endswith('/'):
host = host[:-1]
print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL)
socket.bind(host)
while True:
try:
message = socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
message = None
if message is None:
yield None # NOTE: We have to make the generator non blocking.
else:
task = json.loads(message)
yield task
def __init__(self, push, pull, redis_conf):
super(MinerClient, self).__init__()
print("Connecting to Redis cache {} ...".format(redis_conf))
redis_host, redis_port, redis_db = redis_conf.split(":")
self.redis = redis.StrictRedis(host=redis_host, port=int(redis_port), db=int(redis_db))
self.redis.setnx('transaction', 0)
# NOTE: Expiration times for pending/processed tasks in seconds.
self.transaction_expiration = 60 * 60
self.result_expiration = 60 * 10
context = zmq.Context()
print("Connecting to push socket '{}' ...".format(push))
self.push = context.socket(zmq.PUSH)
self.push.connect(push)
print("Binding to pull socket '{}' ...".format(pull))
self.pull = context.socket(zmq.PULL)
self.pull.bind(pull)
def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind):
pull_socket = socket_factory.create(zmq.PULL)
connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pull_socket.poll(1000) == zmq.POLLIN
message = pull_socket.recv_multipart()
assert message == [b'hello', b'world']
with run_in_background(run) as event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PUSH)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
while not event.is_set():
await asyncio.sleep(0.1)
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket, zstr
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
a = zsocket.new(ctx, zmq.PUSH)
zsocket.bind(a, "inproc://a")
ctx2 = self.Context.shadow_pyczmq(ctx)
b = ctx2.socket(zmq.PULL)
b.connect("inproc://a")
zstr.send(a, b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
b.close()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket, zstr
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
a = zsocket.new(ctx, zmq.PUSH)
zsocket.bind(a, "inproc://a")
ctx2 = self.Context.shadow_pyczmq(ctx)
b = ctx2.socket(zmq.PULL)
b.connect("inproc://a")
zstr.send(a, b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
b.close()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket, zstr
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
a = zsocket.new(ctx, zmq.PUSH)
zsocket.bind(a, "inproc://a")
ctx2 = self.Context.shadow_pyczmq(ctx)
b = ctx2.socket(zmq.PULL)
b.connect("inproc://a")
zstr.send(a, b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
b.close()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket, zstr
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
a = zsocket.new(ctx, zmq.PUSH)
zsocket.bind(a, "inproc://a")
ctx2 = self.Context.shadow_pyczmq(ctx)
b = ctx2.socket(zmq.PULL)
b.connect("inproc://a")
zstr.send(a, b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
b.close()
def execute_command_streamer():
from oldspeak.console.parsers.streamer import parser
args = parser.parse_args(get_sub_parser_argv())
bootstrap_conf_with_gevent(args)
device = Device(zmq.STREAMER, zmq.PULL, zmq.PUSH)
device.bind_in(args.pull)
device.bind_out(args.push)
if args.pull_hwm:
device.setsockopt_in(zmq.RCVHWM, args.pull_hwm)
if args.push_hwm:
device.setsockopt_out(zmq.SNDHWM, args.push_hwm)
print "oldspeak streamer started"
print "date", datetime.utcnow().isoformat()
print "pull", (getattr(args, 'pull'))
print "push", (getattr(args, 'push'))
device.start()
def consumer():
consumer_id = random.randrange(1,10005)
print "I am consumer #%s" % (consumer_id)
context = zmq.Context()
# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
# send work
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.connect("tcp://127.0.0.1:5558")
while True:
work = consumer_receiver.recv_json()
data = work['num']
result = { 'consumer' : consumer_id, 'num' : data}
if data%2 == 0:
consumer_sender.send_json(result)
def __init__(self, name, send_qsize=0, mode='ipc'):
self._name = name
self._conn_info = None
self._context_lock = threading.Lock()
self._context = zmq.Context()
self._tosock = self._context.socket(zmq.ROUTER)
self._frsock = self._context.socket(zmq.PULL)
self._tosock.set_hwm(10)
self._frsock.set_hwm(10)
self._dispatcher = CallbackManager()
self._send_queue = queue.Queue(maxsize=send_qsize)
self._rcv_thread = None
self._snd_thread = None
self._mode = mode
assert mode in ('ipc', 'tcp')
def _setup_ipc(self):
'''
Subscribe to the pub IPC
and publish the messages
on the right transport.
'''
self.ctx = zmq.Context()
log.debug('Setting up the publisher puller')
self.sub = self.ctx.socket(zmq.PULL)
self.sub.bind(PUB_IPC_URL)
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
def main():
try:
context = zmq.Context(1)
# Socket facing clients
frontend = context.socket(zmq.PULL)
frontend.bind("tcp://*:5559")
# Socket facing services
backend = context.socket(zmq.PUSH)
backend.bind("tcp://*:5560")
zmq.device(zmq.STREAMER, frontend, backend)
except Exception, e:
print e
print "bringing down zmq device"
finally:
pass
frontend.close()
backend.close()
context.term()
def init_recv_socket(self):
logger.info("Initalizing receive socket")
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
logger.info("Initalized receive socket")
while not self.isInterruptionRequested():
try:
time.sleep(0.1)
logger.info("Trying to get a connection to gnuradio...")
self.socket.connect("tcp://{0}:{1}".format(self.ip, self.gr_port))
logger.info("Got connection")
break
except (ConnectionRefusedError, ConnectionResetError):
continue
except Exception as e:
logger.error("Unexpected error", str(e))
def test_send_1k_push_pull(self):
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
eventlet.sleep()
done = eventlet.Event()
def tx():
tx_i = 0
while tx_i <= 1000:
tx_i += 1
down.send(str(tx_i).encode())
def rx():
while True:
rx_i = up.recv()
if rx_i == b"1000":
done.send(0)
break
eventlet.spawn(tx)
eventlet.spawn(rx)
final_i = done.wait()
self.assertEqual(final_i, 0)
def __init__(self, opts=None):
if opts is None:
self.opts = self.process_config(CONFIG_LOCATION)
else:
self.opts = opts
return
# General setup of ZeroMQ
self.ctx = zmq.Context()
self.loop = zmq.eventloop.IOLoop.instance()
# Begin setup of PULL socket
self.pull_socket = self.ctx.socket(zmq.PULL)
self.pull_socket.bind('tcp://127.0.0.1:2001')
self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)
self.pull_stream.on_recv(self.stream_decode)
# Begin setup of PUSH socket for IPC to publisher
self.push_socket = self.ctx.socket(zmq.PUSH)
self.push_socket.connect('ipc:///tmp/reactor.ipc')
self.push_stream = zmq.eventloop.zmqstream.ZMQStream(self.push_socket, self.loop)
self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdriventalk/actions')
self.registers = loader.load_registers(self.opts, '/home/mp/devel/eventdriventalk/registers')
self.rules = loader.load_registers(self.opts, '/home/mp/devel/eventdriventalk/rules')
def msg_server(self):
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:%s"%config.ZMQ_PORT)
logging.info("zmq msg_server start...")
while not self.is_terminated:
# Wait for next request from client
message = socket.recv()
logging.info("new pull message: %s", message)
self.process_message(message)
time.sleep (1) # Do some 'work'
def main(self):
"""
The main of the logger.
"""
config = Config.get()
# Startup logger.
self.__startup()
# Register our socket for asynchronous incoming messages.
self.__message_controller.register_end_point('pull', zmq.PULL, config.get_logger_pull_end_point())
# Register supported message types
self.__message_controller.register_message_type(HaltMessage.MESSAGE_TYPE)
self.__message_controller.register_message_type(LogFileMessage.MESSAGE_TYPE)
# Register message received event handlers.
self.__message_controller.register_listener(HaltMessage.MESSAGE_TYPE, HaltMessageEventHandler.handle)
self.__message_controller.register_listener(LogFileMessage.MESSAGE_TYPE, LogFileMessageEventHandler.handle)
# Register other event handlers.
self.__event_controller.event_queue_empty.register_listener(self.__message_controller.receive_message)
# Run the event loop.
self.__event_controller.loop()
# Shutdown logger.
self.__shutdown()
# ------------------------------------------------------------------------------------------------------------------
def test_context_manager(self):
url = 'inproc://a'
with self.Context() as ctx:
with ctx.socket(zmq.PUSH) as a:
a.bind(url)
with ctx.socket(zmq.PULL) as b:
b.connect(url)
msg = b'hi'
a.send(msg)
rcvd = self.recv(b)
self.assertEqual(rcvd, msg)
self.assertEqual(b.closed, True)
self.assertEqual(a.closed, True)
self.assertEqual(ctx.closed, True)
def test_identity(self):
s = self.context.socket(zmq.PULL)
self.sockets.append(s)
ident = b'identity\0\0'
s.identity = ident
self.assertEqual(s.get(zmq.IDENTITY), ident)
def test_shadow(self):
p = self.socket(zmq.PUSH)
p.bind("tcp://127.0.0.1:5555")
p2 = zmq.Socket.shadow(p.underlying)
self.assertEqual(p.underlying, p2.underlying)
s = self.socket(zmq.PULL)
s2 = zmq.Socket.shadow(s.underlying)
self.assertNotEqual(s.underlying, p.underlying)
self.assertEqual(s.underlying, s2.underlying)
s2.connect("tcp://127.0.0.1:5555")
sent = b'hi'
p2.send(sent)
rcvd = self.recv(s2)
self.assertEqual(rcvd, sent)