Python zmq 模块,PAIR 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.PAIR。
def main():
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
socket.send_string(str('hello'))
message = '00101110'
cnt = 0
while True:
reward = socket.recv() # 1 or 0, or '-1' for None
print(reward)
msg_in = socket.recv()
print(msg_in)
# think...
msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
if cnt % 2 == 0:
msg_out = str(message[cnt % 8])
socket.send(msg_out)
cnt = cnt + 1
def __init__(self, cmd, port, address=None):
try:
import zmq
except ImportError:
raise ImportError("Must have zeromq for remote learner.")
if address is None:
address = '*'
if port is None:
port = 5556
elif int(port) < 1 or int(port) > 65535:
raise ValueError("Invalid port number: %s" % port)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind("tcp://%s:%s" % (address, port))
# launch learner
if cmd is not None:
subprocess.Popen((cmd + ' ' + str(port)).split())
handshake_in = self.socket.recv().decode('utf-8')
assert handshake_in == 'hello' # handshake
# send to learner, and get response;
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_tcp_pair_socket(event_loop, socket_factory, connect_or_bind):
pair_socket = socket_factory.create(zmq.PAIR)
connect_or_bind(pair_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pair_socket.poll(1000) == zmq.POLLIN
message = pair_socket.recv_multipart()
assert message == [b'hello', b'world']
pair_socket.send_multipart([b'my', b'message'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PAIR)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
message = await asyncio.wait_for(socket.recv_multipart(), 1)
assert message == [b'my', b'message']
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def ensure_and_bind(self, socket_name, socket_type, address, polling_mechanism):
"""Ensure that a socket exists, that is *binded* to the given address
and that is registered with the given polling mechanism.
This method is a handy replacement for calling
``.get_or_create()``, ``.bind()`` and then ``.engage()``.
returns the socket itself.
:param socket_name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
"""
self.get_or_create(socket_name, socket_type, polling_mechanism)
socket = self.bind(socket_name, address, polling_mechanism)
self.engage()
return socket
def get_or_create(self, name, socket_type, polling_mechanism):
"""ensure that a socket exists and is registered with a given
polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``)
"""
if name not in self.sockets:
self.create(name, socket_type)
socket = self.get_by_name(name)
self.register_socket(socket, polling_mechanism)
return socket
def test_recv_spawned_before_send_is_non_blocking(self):
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# req.connect(ipc)
# rep.bind(ipc)
eventlet.sleep()
msg = dict(res=None)
done = eventlet.Event()
def rx():
msg['res'] = rep.recv()
done.send('done')
eventlet.spawn(rx)
req.send(b'test')
done.wait()
self.assertEqual(msg['res'], b'test')
def main():
import zmq
import random
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
socket.send_string(str('hello'))
message = '00101110'
cnt = 0
while True:
reward = socket.recv() # 1 or 0, or '-1' for None
print(reward)
msg_in = socket.recv()
print(msg_in)
# think...
msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
if cnt % 2 == 0:
msg_out = str(message[cnt % 8])
socket.send(msg_out)
cnt = cnt + 1
def __init__(self, cmd, port):
try:
import zmq
except ImportError:
raise ImportError("Must have zeromq for remote learner.")
self.port = port if port is not None else 5556
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind("tcp://*:%s" % port)
# launch learner
subprocess.Popen((cmd + ' ' + str(self.port)).split())
handshake_in = self.socket.recv()
assert handshake_in == 'hello' # handshake
# send to learner, and get response;
def zcreate_pipe(ctx, hwm=1000):
backend = zsocket.ZSocket(ctx, zmq.PAIR)
frontend = zsocket.ZSocket(ctx, zmq.PAIR)
backend.set_hwm(hwm)
frontend.set_hwm(hwm)
# close immediately on shutdown
backend.setsockopt(zmq.LINGER, 0)
frontend.setsockopt(zmq.LINGER, 0)
endpoint = "inproc://zactor-%04x-%04x\n"\
%(random.randint(0, 0x10000), random.randint(0, 0x10000))
while True:
try:
frontend.bind(endpoint)
except:
endpoint = "inproc://zactor-%04x-%04x\n"\
%(random.randint(0, 0x10000), random.randint(0, 0x10000))
else:
break
backend.connect(endpoint)
return (frontend, backend)
def start(self):
"""Start the authentication thread"""
# create a socket to communicate with auth thread.
self.pipe = self.context.socket(zmq.PAIR)
self.pipe.linger = 1
self.pipe.bind(self.pipe_endpoint)
authenticator = MultiZapAuthenticator(self.context, encoding=self.encoding,
log=self.log)
self.thread = AuthenticationThread(self.context, self.pipe_endpoint,
encoding=self.encoding, log=self.log,
authenticator=authenticator)
self.thread.start()
# Event.wait:Changed in version 2.7: Previously, the method always returned None.
if sys.version_info < (2, 7):
self.thread.started.wait(timeout=10)
else:
if not self.thread.started.wait(timeout=10):
raise RuntimeError("Authenticator thread failed to start")
def main():
import zmq
import random
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
socket.send_string(str('hello'))
message = '00101110'
cnt = 0
while True:
reward = socket.recv() # 1 or 0, or '-1' for None
print(reward)
msg_in = socket.recv()
print(msg_in)
# think...
msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
if cnt % 2 == 0:
msg_out = str(message[cnt % 8])
socket.send(msg_out)
cnt = cnt + 1
def __init__(self, cmd, port):
try:
import zmq
except ImportError:
raise ImportError("Must have zeromq for remote learner.")
self.port = port if port is not None else 5556
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind("tcp://*:%s" % port)
# launch learner
subprocess.Popen((cmd + ' ' + str(self.port)).split())
handshake_in = self.socket.recv()
assert handshake_in == 'hello' # handshake
# send to learner, and get response;