Python zmq 模块,REQ 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.REQ。
def frontendClient(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5559")
socket.RCVTIMEO = 2000 #we will only wait 2s for a reply
while True:
#randomly request either service A or service B
serviceRequest = random.choice([b'Service A',b'Service B'])
with myLock:
print "client wants %s" % serviceRequest
socket.send(serviceRequest)
try:
reply = socket.recv()
except Exception as e:
print "client timed out"
break
if not reply:
break
with myLock:
print "Client got reply: "
print reply
print
#take a nap
time.sleep(1)
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REQ)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.connect(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def __init__(self, reqAddress, subAddress):
"""Constructor"""
super(RpcClient, self).__init__()
# zmq????
self.__reqAddress = reqAddress
self.__subAddress = subAddress
self.__context = zmq.Context()
self.__socketREQ = self.__context.socket(zmq.REQ) # ????socket
self.__socketSUB = self.__context.socket(zmq.SUB) # ????socket
# ???????????????????
self.__active = False # ????????
self.__thread = threading.Thread(target=self.run) # ????????
#----------------------------------------------------------------------
def __init__(self):
# if not exist server, spawn server, try except around
context = zmq.Context()
# try to start server in background
os.system("justdb serve &")
main_socket = context.socket(zmq.REQ)
main_socket.connect("tcp://localhost:5555")
# print("Connecting to write server")
freeze_socket = context.socket(zmq.REQ)
freeze_socket.connect("tcp://localhost:6666")
self.main_socket = main_socket
self.freeze_socket = freeze_socket
def flash(self):
if self.pid != str(os.getpid()):
# reset process pid
self.pid = str(os.getpid())
# update zmq sockets
# (couldnt share socket in differenet process)
self.zmq_socket = zmq.Context().socket(zmq.REQ)
self.zmq_file_socket = zmq.Context().socket(zmq.DEALER)
# update context
ctx = main_context(self.main_file, self.main_folder)
if self.main_param is not None:
main_config_path = os.path.join(self.main_folder, self.main_param)
params = yaml.load(open(main_config_path, 'r'))
ctx.params = params
self.context = ctx
def main():
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://%s:%s" % (config.LISTEN_ON_IP, config.LISTEN_ON_PORT))
while True:
command = input("Command: ")
socket.send(command.encode(config.CODEC))
response = socket.recv().decode(config.CODEC)
print(" ... %s" % response)
words = shlex.split(response.lower())
status = words[0]
if len(words) > 1:
info = words[1:]
if status == "finished":
print("Finished status received from robot")
break
def handle_in(self):
self.msg_count_in += 1
data = self.socket.recv_multipart()
binary, sender = None, None # initialise outside for edge cases
if len(data) == 3:
if data[1] == '': # This is a RPC call from a zmq.REQ socket
sender, _blank, msg_buf = data
self.handle_rpc(sender, msg_factory(msg_buf))
return
sender, msg_buf, binary = data
elif len(data) == 2: # This is an internode call from another zmq.ROUTER, a Controller or Worker
sender, msg_buf = data
msg = msg_factory(msg_buf)
if binary:
msg['data'] = binary
if sender in self.others:
self.handle_peer(sender, msg)
else:
self.handle_worker(sender, msg)
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'my', b'question']
rep_socket.send_multipart([b'your', b'answer'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REQ)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'my', b'question']),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'your', b'answer']
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REP)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([b'your', b'answer']),
1,
)
def test_tcp_router_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
req_socket.identity = b'abcd'
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.ROUTER)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
identity = frames.pop(0)
assert identity == req_socket.identity
assert frames == [b'', b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([identity, b'', b'your', b'answer']),
1,
)
def test_tcp_big_messages(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'1' * 500, b'2' * 100000]
rep_socket.send_multipart([b'3' * 500, b'4' * 100000])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REQ)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'1' * 500, b'2' * 100000]),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'3' * 500, b'4' * 100000]
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def __init__(self, server_ip, server_port, task_id='', debug=False):
if debug:
l.setLevel(logging.DEBUG)
l.debug("Hydra Analyser initiated...")
self.server_ip = server_ip
self.port = server_port
self.task_id = task_id
self.data = {} # This is where all received data will be stored
self.context = zmq.Context.instance()
self.poller = zmq.Poller()
self.req_msg = hdaemon_pb2.CommandMessage()
self.resp_msg = hdaemon_pb2.ResponseMessage()
l.debug("Connecting to server at [%s:%s]", self.server_ip, self.port)
self.socket = self.context.socket(zmq.REQ)
self.socket.connect("tcp://%s:%s" % (self.server_ip, self.port))
l.debug("Connected...")
def test_app_communication(self):
tapp = 'testapp2'
# clean up any previous app by this name
self.rt.delete_app(tapp)
self.rt.create_hydra_app(name=tapp, app_path='hydra.selftest.agents.Test',
app_args='5598 0',
cpus=0.01, mem=32)
taskip = self.rt.find_ip_uniqueapp(tapp)
tasks = self.rt.get_app_tasks(tapp)
self.assertTrue(len(tasks) == 1)
self.assertTrue(len(tasks[0].ports) == 1)
taskport = str(tasks[0].ports[0])
pprint('task is launched at ip=' + taskip + ":" + taskport)
# now send a message to this app to find out how it's doing
zctx = zmq.Context()
zsocket = zctx.socket(zmq.REQ)
zsocket.connect("tcp://%s:%s" % (taskip, taskport))
zsocket.send_string('ping')
message = zsocket.recv().decode("utf-8")
# stop and clean up
self.rt.delete_app(tapp)
self.assertEqual(message, 'pong')
def __init__(self, worker_id, outside_ros=False):
self.worker_id = worker_id
self.outside_ros = outside_ros
if self.outside_ros:
rospy.logwarn('Controller is using ZMQ to get work')
self.context = Context()
self.socket = self.context.socket(REQ)
self.socket.connect('tcp://127.0.0.1:33589')
else:
rospy.logwarn('Controller is using ROS to get work')
self.services = {'get': {'name': '/work/get', 'type': GetWork},
'update': {'name': '/work/update', 'type': UpdateWorkStatus}}
for service_name, service in self.services.items():
rospy.loginfo("Controller is waiting service {}...".format(service['name']))
rospy.wait_for_service(service['name'])
service['call'] = rospy.ServiceProxy(service['name'], service['type'])
def __init__(self, url, pattern=ZmqfPattern.MPUP):
'''
'''
protocol, host, port, uri = zmqf_utils.parse_url(url)
self.context = zmq.Context()
self.pattern = pattern
if self.pattern == ZmqfPattern.MPBS:
self._socket = self.context.socket(zmq.PUB) # @UndefinedVariable
self._socket.connect('%s://%s:%s'% (protocol, host, port))
time.sleep(0.25)
elif self.pattern == ZmqfPattern.MPUP:
self._socket = self.context.socket(zmq.PUSH) # @UndefinedVariable
self._socket.connect('%s://%s:%s'% (protocol, host, port))
elif self.pattern == ZmqfPattern.MRER:
self._socket = self.context.socket(zmq.REQ) # @UndefinedVariable
self._socket.connect('%s://%s:%s'% (protocol, host, port))
def __init__(self, host=None, req_port=None, use_security=False):
if host is None:
host = env.get_master_host()
context = zmq.Context()
self._socket = context.socket(zmq.REQ)
self._auth = None
if use_security:
self._auth = Authenticator.instance(
env.get_server_public_key_dir())
self._auth.set_client_key(self._socket, env.get_client_secret_key_path(),
env.get_server_public_key_path())
if req_port is None:
req_port = env.get_req_port()
self._socket.connect(
'tcp://{host}:{port}'.format(host=host, port=req_port))
def __init__(self, name, actor_context = None, endpoints = None):
"""
Create a client
Keyword arguments:
name - Name of the timer
actor_context - ZMQ context of the actor process
endpoints - A list of endpoint strings
"""
self.name = name
self.endpoints = None
self.context = actor_context
self.client_socket = None
if not (endpoints == None):
self.endpoints = endpoints
self.context = zmq.Context()
self.client_socket = self.context.socket(zmq.REQ)
for endpoint in self.endpoints:
self.client_socket.connect(endpoint)
def worker_thread(_url, context, i):
master = context.socket(zmq.REQ)
master.identity = ("Worker-%d" % i).encode('ascii')
master.connect(_url)
# [performance, status]
master.send_multipart([i.to_bytes(1, 'little'), b"", b'READY'])
print("[%s] I'm ready..." % (master.identity.decode('ascii')))
while True:
[client_addr, empty, request] = master.recv_multipart()
assert empty == b""
print("[%s] Processing task... %s / %s" % (master.identity.decode('ascii'),
client_addr.decode('ascii'),
request.decode('ascii')))
time.sleep(randrange(1, 10))
print("[%s] finish task... %s / %s" % (master.identity.decode('ascii'),
client_addr.decode('ascii'),
request.decode('ascii')))
master.send_multipart([i.to_bytes(1, 'little'), b"", client_addr, b"", b"FINISH"])
def ensure_and_bind(self, socket_name, socket_type, address, polling_mechanism):
"""Ensure that a socket exists, that is *binded* to the given address
and that is registered with the given polling mechanism.
This method is a handy replacement for calling
``.get_or_create()``, ``.bind()`` and then ``.engage()``.
returns the socket itself.
:param socket_name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
"""
self.get_or_create(socket_name, socket_type, polling_mechanism)
socket = self.bind(socket_name, address, polling_mechanism)
self.engage()
return socket
def get_or_create(self, name, socket_type, polling_mechanism):
"""ensure that a socket exists and is registered with a given
polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``)
"""
if name not in self.sockets:
self.create(name, socket_type)
socket = self.get_by_name(name)
self.register_socket(socket, polling_mechanism)
return socket
def zmq_request(self, msg_type, msg_content, timeout=__DEFAULT_REQUEST_TIMEOUT):
# new socket to talk to server
self.__socket = zmq.Context().socket(zmq.REQ)
self.__socket.connect("tcp://localhost:" + ZMQPort.RQ)
# init poller and register to socket that web can poll socket to check is it has messages
poller = zmq.Poller()
poller.register(self.__socket, zmq.POLLIN)
send_flatbuf_msg(self.__socket, msg_type, msg_content)
reqs = 0
while reqs * self.__POLL_INTERVAL <= timeout:
socks = dict(poller.poll(self.__POLL_INTERVAL))
if self.__socket in socks and socks[self.__socket] == zmq.POLLIN:
msg = self.__socket.recv()
msgObj = TransMsg.GetRootAsTransMsg(msg, 0)
return msgObj.Content()
reqs = reqs + 1
return False
def __zmq_init(self):
"""
Initializes ZMQ.
"""
config = Config.get()
self.__zmq_context = zmq.Context()
# Create socket for communicating with the controller.
self.__zmq_controller = self.__zmq_context.socket(zmq.REQ)
self.__zmq_controller.connect(config.get_controller_lockstep_end_point())
# ----------------------------------------------------------------------------------------------------------------------
def __zmq_init(self):
"""
Initializes ZMQ.
"""
config = Config.get()
self.__zmq_context = zmq.Context()
# Create socket for communicating with the controller.
self.__zmq_controller = self.__zmq_context.socket(zmq.REQ)
self.__zmq_controller.connect(config.get_controller_lockstep_end_point())
# ----------------------------------------------------------------------------------------------------------------------