Python zmq 模块,Context() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.Context()。
def __init__(self, opts=None):
if opts is None:
self.opts = self.process_config(CONFIG_LOCATION)
else:
self.opts = opts
self.ctx = zmq.Context()
self.pub_socket = self.ctx.socket(zmq.PUB)
self.pub_socket.bind('tcp://127.0.0.1:2000')
self.loop = zmq.eventloop.IOLoop.instance()
self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop)
# Now create PULL socket over IPC to listen to reactor
self.pull_socket = self.ctx.socket(zmq.PULL)
self.pull_socket.bind('ipc:///tmp/reactor.ipc')
self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)
self.pull_stream.on_recv(self.republish)
def serviceA(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.DEALER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'A')
service.connect("tcp://localhost:5560")
while True:
message = service.recv()
with myLock:
print "Service A got:"
print message
if message == "Service A":
#do some work
time.sleep(random.uniform(0,0.5))
service.send(b"Service A did your laundry")
elif message == "END":
break
else:
with myLock:
print "the server has the wrong identities!"
break
def frontendClient(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5559")
socket.RCVTIMEO = 2000 #we will only wait 2s for a reply
while True:
#randomly request either service A or service B
serviceRequest = random.choice([b'Service A',b'Service B'])
with myLock:
print "client wants %s" % serviceRequest
socket.send(serviceRequest)
try:
reply = socket.recv()
except Exception as e:
print "client timed out"
break
if not reply:
break
with myLock:
print "Client got reply: "
print reply
print
#take a nap
time.sleep(1)
def __init__(self, opts=None):
if opts is None:
self.opts = self.process_config(CONFIG_LOCATION)
else:
self.opts = opts
# Start setting up ZeroMQ
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.SUB)
self.socket.connect('tcp://localhost:2000')
self.loop = zmq.eventloop.IOLoop.instance()
self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, self.loop)
self.stream.on_recv(act)
# Load up actions
self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdrivetalk/actions')
def run(self):
"""
Entry point for the live plotting when started as a separate process. This starts the loop
"""
self.entity_name = current_process().name
plogger.info("Starting new thread %s", self.entity_name)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect("tcp://localhost:%d" % self.port)
topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL)
self.socket.setsockopt(zmq.SUBSCRIBE, topic)
plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port)
self.init(**self.init_kwargs)
# Reference to animation required so that GC doesn't clean it up.
# WILL NOT work if you remove it!!!!!
# See: http://matplotlib.org/api/animation_api.html
ani = animation.FuncAnimation(self.fig, self.loop, interval=100)
self.plt.show()
def notify_msg(self, type, price):
import zmq
try:
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect ("tcp://%s:%s" % (config.ZMQ_HOST, config.ZMQ_PORT))
time.sleep(1)
message = {'type':type, 'price':price}
logging.info( "notify message %s", json.dumps(message))
socket.send_string(json.dumps(message))
except Exception as e:
logging.warn("notify_msg Exception")
pass
def rec(port):
zmq_ctx = zmq.Context()
s = zmq_ctx.socket(zmq.SUB)
s.bind('tcp://*:{port}'.format(port=port))
s.setsockopt(zmq.SUBSCRIBE, b"")
stream = ZMQStream(s)
stream.on_recv_stream(rec_frame)
ioloop.IOLoop.instance().start()
while True:
pass
def main():
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
socket.send_string(str('hello'))
message = '00101110'
cnt = 0
while True:
reward = socket.recv() # 1 or 0, or '-1' for None
print(reward)
msg_in = socket.recv()
print(msg_in)
# think...
msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
if cnt % 2 == 0:
msg_out = str(message[cnt % 8])
socket.send(msg_out)
cnt = cnt + 1
def __init__(self, cmd, port, address=None):
try:
import zmq
except ImportError:
raise ImportError("Must have zeromq for remote learner.")
if address is None:
address = '*'
if port is None:
port = 5556
elif int(port) < 1 or int(port) > 65535:
raise ValueError("Invalid port number: %s" % port)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind("tcp://%s:%s" % (address, port))
# launch learner
if cmd is not None:
subprocess.Popen((cmd + ' ' + str(port)).split())
handshake_in = self.socket.recv().decode('utf-8')
assert handshake_in == 'hello' # handshake
# send to learner, and get response;
def __init__(self):
"""
Object constructor.
"""
Command.__init__(self)
self._zmq_context = None
"""
The ZMQ context.
:type: Context
"""
self.__end_points = {}
"""
The end points of the Enarksh daemons.
:type: dict[string,string]
"""
# ------------------------------------------------------------------------------------------------------------------
def serviceB(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.DEALER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'B')
service.connect("tcp://localhost:5560")
while True:
message = service.recv()
with myLock:
print "Service B got:"
print message
if message == "Service B":
#do some work
time.sleep(random.uniform(0,0.5))
service.send(b"Service B cleaned your room")
elif message == "END":
break
else:
with myLock:
print "the server has the wrong identities!"
break
def tearDown(self):
contexts = set([self.context])
while self.sockets:
sock = self.sockets.pop()
contexts.add(sock.context) # in case additional contexts are created
sock.close(0)
for ctx in contexts:
t = Thread(target=ctx.term)
t.daemon = True
t.start()
t.join(timeout=2)
if t.is_alive():
# reset Context.instance, so the failure to term doesn't corrupt subsequent tests
zmq.sugar.context.Context._instance = None
raise RuntimeError("context could not terminate, open sockets likely remain in test")
super(BaseZMQTestCase, self).tearDown()
def main():
context = zmq.Context()
socket = zmq.Socket(context, zmq.SUB)
monitor = socket.get_monitor_socket()
socket.connect(ipc_sub_url)
while True:
status = recv_monitor_message(monitor)
if status['event'] == zmq.EVENT_CONNECTED:
break
elif status['event'] == zmq.EVENT_CONNECT_DELAYED:
pass
print('connected')
socket.subscribe('pupil')
while True:
topic = socket.recv_string()
payload = serializer.loads(socket.recv(), encoding='utf-8')
print(topic, payload)
def main():
try:
context = zmq.Context(1)
# Socket do cliente
frontend = context.socket(zmq.XREP)
frontend.bind("tcp://*:5559")
# Socket do servidor
backend = context.socket(zmq.XREQ)
backend.bind("tcp://*:5560")
zmq.device(zmq.QUEUE, frontend, backend)
except :
for val in sys.exc_info():
print(val)
print("Desativa a fila")
finally:
pass
frontend.close()
backend.close()
context.term()
def test_reqrep_raw_zmq_outside(nsproxy):
"""
Simple request-reply pattern between an agent and a direct ZMQ connection.
"""
def rep_handler(agent, message):
return message
# Create an osBrain agent that will receive the message
a1 = run_agent('a1')
a1.set_attr(received=None)
addr = a1.bind('REP', transport='tcp', handler=rep_handler,
serializer='raw')
# Create a raw ZeroMQ REQ socket
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://%s:%s' % (addr.address.host, addr.address.port))
# Send the message
message = b'Hello world'
socket.send(message)
assert socket.recv() == message
socket.close()
context.destroy()
def test_pushpull_raw_zmq_outside(nsproxy):
"""
Simple push-pull pattern test. Channel without serialization.
The message is sent from outside osBrain, through a ZMQ PUSH socket.
"""
# Create an osBrain agent that will receive the message
a1 = run_agent('a1')
a1.set_attr(received=None)
addr = a1.bind('PULL', transport='tcp', handler=set_received,
serializer='raw')
# Create a raw ZeroMQ PUSH socket
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect('tcp://%s:%s' % (addr.address.host, addr.address.port))
# Send the message
message = b'Hello world'
socket.send(message)
assert wait_agent_attr(a1, name='received', value=message)
socket.close()
context.destroy()
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REQ)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.connect(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.PULL)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REP)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def __init__(self, repAddress, pubAddress):
"""Constructor"""
super(RpcServer, self).__init__()
# ??????????key?????value?????
self.__functions = {}
# zmq????
self.__context = zmq.Context()
self.__socketREP = self.__context.socket(zmq.REP) # ????socket
self.__socketREP.bind(repAddress)
self.__socketPUB = self.__context.socket(zmq.PUB) # ????socket
self.__socketPUB.bind(pubAddress)
# ??????
self.__active = False # ????????
self.__thread = threading.Thread(target=self.run) # ????????
#----------------------------------------------------------------------
def __init__(self, reqAddress, subAddress):
"""Constructor"""
super(RpcClient, self).__init__()
# zmq????
self.__reqAddress = reqAddress
self.__subAddress = subAddress
self.__context = zmq.Context()
self.__socketREQ = self.__context.socket(zmq.REQ) # ????socket
self.__socketSUB = self.__context.socket(zmq.SUB) # ????socket
# ???????????????????
self.__active = False # ????????
self.__thread = threading.Thread(target=self.run) # ????????
#----------------------------------------------------------------------
def router_main(_, pidx, args):
log = get_logger('examples.zmqserver.extra', pidx)
ctx = zmq.Context()
ctx.linger = 0
in_sock = ctx.socket(zmq.PULL)
in_sock.bind('tcp://*:5000')
out_sock = ctx.socket(zmq.PUSH)
out_sock.bind('ipc://example-events')
try:
log.info('router proxy started')
zmq.proxy(in_sock, out_sock)
except KeyboardInterrupt:
pass
except:
log.exception('unexpected error')
finally:
log.info('router proxy terminated')
in_sock.close()
out_sock.close()
ctx.term()
def reset(self):
self.status = READY
context = zmq.Context()
self._socket1 = context.socket(zmq.PUSH)
self._socket1.bind(self._address1)
self._socket1.set_hwm(32)
self._socket2 = context.socket(zmq.PULL)
self._socket2.set_hwm(32)
self._socket2.RCVTIMEO = 1
self._socket2.bind(self._address2)
self._prev_drained = False
self._sub_drained = False
self._conn1_send_count = 0
self._conn1_recv_count = {}
self._conn2_send_count = {}
self._conn2_recv_count = 0
self._retry_count = 0
def reset(self):
self.status = READY
context = zmq.Context()
self._socket = context.socket(zmq.PULL)
self._socket.RCVTIMEO = 1
sync_socket = context.socket(zmq.PUSH)
while self._ports['conn1'] is None or self._ports['sync_conn1'] is None:
sleep(0.01)
# Handshake with main process
self._socket.connect(self._address + ':' + str(self._ports['conn1']))
sync_socket.connect(self._address + ':' + str(self._ports['sync_conn1']))
packet = msgpack.dumps(b'SYNC')
sync_socket.send(packet)
sync_socket.close()
self._num_recv = 0
self._drained = False
def test_pub(self):
"""Publish log messages. bind() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
pub.bind('tcp://*:{}'.format(self.sub_port))
except zmq.ZMQError as error:
print(error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
sys.stdout.flush()
# Wait for the watcher thread to exit.
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
def test_pub(self):
"""Publish log messages. connect() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
_address = 'tcp://{}:{}'.format(self.sub_host, self.sub_port)
pub.connect(_address)
except zmq.ZMQError as error:
print('ERROR:', error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
# Wait for the watcher thread to exit
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
def to(cls, channel, host='127.0.0.1',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
level=logging.NOTSET):
"""Convenience class method to create a ZmqLoghandler and
connect to a ZMQ subscriber.
Args:
channel (string): Logging channel name. This is used to build a
ZMQ topic.
host (string): Hostname / ip address of the subscriber to publish
to.
port (int, string): Port on which to publish messages.
level (int): Logging level
"""
context = zmq.Context()
publisher = context.socket(zmq.PUB)
address = 'tcp://{}:{}'.format(host, port)
publisher.connect(address)
time.sleep(0.1) # This sleep hopefully fixes the silent joiner problem.
return cls(channel, publisher, level=level)
def __init__(self):
# if not exist server, spawn server, try except around
context = zmq.Context()
# try to start server in background
os.system("justdb serve &")
main_socket = context.socket(zmq.REQ)
main_socket.connect("tcp://localhost:5555")
# print("Connecting to write server")
freeze_socket = context.socket(zmq.REQ)
freeze_socket.connect("tcp://localhost:6666")
self.main_socket = main_socket
self.freeze_socket = freeze_socket
def create_server():
context = zmq.Context()
try:
main_socket = context.socket(zmq.REP)
main_socket.bind("tcp://*:5555")
freeze_socket = context.socket(zmq.REP)
freeze_socket.bind("tcp://*:6666")
except zmq.error.ZMQError:
print("JustDB already running, this is no error.")
sys.exit()
print("Successfully started \033[92mjustdb\033[0m")
while True: # pragma: no cover
_ = main_socket.recv()
main_socket.send(b"")
_ = freeze_socket.recv()
freeze_socket.send(b"")
def create_socket(port):
"""
Create zmq sub socket.
"""
context = zmq.Context()
socket = context.socket(zmq.SUB)
try:
socket.bind("tcp://*:%s" % port)
except zmq.error.ZMQError:
print("Address already in use")
sys.exit(1)
socket.setsockopt(zmq.SUBSCRIBE, b"")
print("Start node-masternode Subscribe")
return socket, context
def __init__(self, addr="*", port="8080", logger=None):
self.logger = logger
# create a socket object
self.context = zmq.Context()
self.complete_address = Address(addr, port).complete_address
self.sync_address = ''
# Socket used with the following node
self.list_communication_channel = None
# This part is just for test
# if port == '5555':
# self.sync_address = Address(addr, '5562').complete_address
# elif port == '5556':
# self.sync_address = Address(addr, '5563').complete_address
# elif port == '5557':
# self.sync_address = Address(addr, '5564').complete_address
def forward(self, data):
try:
# self.logger.debug('sending message')
self.list_communication_channel.send(data)
# self.logger.debug('ok with the message')
except zmq.NotDone:
# time.sleep(TRY_TIMEOUT)
self.logger.debug('my recipient is dead, not done')
self.list_communication_channel.close()
except zmq.Again:
self.logger.debug('my recipient is dead')
# self.list_communication_channel.close()
raise zmq.Again
except zmq.ZMQError as a:
self.logger.debug("Error in message forward " + a.strerror)
self.context.destroy()
self.context = zmq.Context()
def send_int_message(self, msg=b'ALIVE', timeout=TRACKER_INFINITE_TIMEOUT):
try:
self.logger.debug('sending message to {}'.format(self.sync_address))
tracker_object = self.list_communication_channel.send(msg, track=True, copy=False)
# wait forever
tracker_object.wait(timeout)
# self.logger.debug('ok with the message')
except zmq.NotDone:
self.logger.debug('Something went wrong with that message')
time.sleep(TRY_TIMEOUT)
# self.logger.debug('Sleep finished')
# self.list_communication_channel.close()
except zmq.ZMQError as a:
self.logger.debug(a.strerror)
self.context.destroy()
self.context = zmq.Context()
self.generate_internal_channel_client_side()
# used when it's the first time to sync
def flash(self):
if self.pid != str(os.getpid()):
# reset process pid
self.pid = str(os.getpid())
# update zmq sockets
# (couldnt share socket in differenet process)
self.zmq_socket = zmq.Context().socket(zmq.REQ)
self.zmq_file_socket = zmq.Context().socket(zmq.DEALER)
# update context
ctx = main_context(self.main_file, self.main_folder)
if self.main_param is not None:
main_config_path = os.path.join(self.main_folder, self.main_param)
params = yaml.load(open(main_config_path, 'r'))
ctx.params = params
self.context = ctx
def prepare():
config = Config()
global tee
global input_files_dir
global result_files_dir
context = zmq.Context()
logger_socket = context.socket(zmq.PUSH)
logger_socket.connect(config.server_log['external_url'])
tee = logger_socket.send_string
atexit.register(close_sockets, [logger_socket])
input_files_dir = os.path.expanduser(config.server_files['input_files_dir'])
result_files_dir = os.path.expanduser(config.server_files['result_files_dir'])
tee('Started service files with pid {}'.format(os.getpid()))
return config
def main():
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://%s:%s" % (config.LISTEN_ON_IP, config.LISTEN_ON_PORT))
while True:
command = input("Command: ")
socket.send(command.encode(config.CODEC))
response = socket.recv().decode(config.CODEC)
print(" ... %s" % response)
words = shlex.split(response.lower())
status = words[0]
if len(words) > 1:
info = words[1:]
if status == "finished":
print("Finished status received from robot")
break
def zmq_streamer():
try:
context = zmq.Context()
# Socket facing clients
frontend = context.socket(zmq.PUSH)
frontend.bind("tcp://*:%s" % (zmq_queue_port_push))
# Socket facing services
backend = context.socket(zmq.PULL)
backend.bind("tcp://*:%s" % (zmq_queue_port_pull))
zmq.device(zmq.STREAMER, frontend, backend)
except Exception as e:
print(e)
print("bringing down zmq device")
finally:
frontend.close()
backend.close()
context.term()
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
raise Exception("Datadir %s is not a valid directory" % data_dir)
self.worker_id = binascii.hexlify(os.urandom(8))
self.node_name = socket.gethostname()
self.data_dir = data_dir
self.data_files = set()
context = zmq.Context()
self.socket = context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.identity = self.worker_id
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.redis_server = redis.from_url(redis_url)
self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers
self.check_controllers()
self.last_wrm = 0
self.start_time = time.time()
self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
self.logger.setLevel(loglevel)
self.msg_count = 0
signal.signal(signal.SIGTERM, self.term_signal())
def __init__(self, address=None, timeout=120, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO, retries=3):
self.logger = bqueryd.logger.getChild('rpc')
self.logger.setLevel(loglevel)
self.context = zmq.Context()
self.redis_url = redis_url
redis_server = redis.from_url(redis_url)
self.retries = retries
self.timeout = timeout
self.identity = binascii.hexlify(os.urandom(8))
if not address:
# Bind to a random controller
controllers = list(redis_server.smembers(bqueryd.REDIS_SET_KEY))
if len(controllers) < 1:
raise Exception('No Controllers found in Redis set: ' + bqueryd.REDIS_SET_KEY)
random.shuffle(controllers)
else:
controllers = [address]
self.controllers = controllers
self.connect_socket()
def __init__(self, push, pull, redis_conf):
super(MinerClient, self).__init__()
print("Connecting to Redis cache {} ...".format(redis_conf))
redis_host, redis_port, redis_db = redis_conf.split(":")
self.redis = redis.StrictRedis(host=redis_host, port=int(redis_port), db=int(redis_db))
self.redis.setnx('transaction', 0)
# NOTE: Expiration times for pending/processed tasks in seconds.
self.transaction_expiration = 60 * 60
self.result_expiration = 60 * 10
context = zmq.Context()
print("Connecting to push socket '{}' ...".format(push))
self.push = context.socket(zmq.PUSH)
self.push.connect(push)
print("Binding to pull socket '{}' ...".format(pull))
self.pull = context.socket(zmq.PULL)
self.pull.bind(pull)
def brute_zmq(host, port=5555, user=None, password=None, db=0):
context = zmq.Context()
# Configure
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics
socket.setsockopt(zmq.LINGER, 0) # All topics
socket.RCVTIMEO = 1000 # timeout: 1 sec
# Connect
socket.connect("tcp://%s:%s" % (host, port))
# Try to receive
try:
socket.recv()
return True
except Exception:
return False
finally:
socket.close()
def handle_zmq(host, port=5555, extra_config=None):
# log.debug(" * Connection to ZeroMQ: %s : %s" % (host, port))
context = zmq.Context()
# Configure
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics
socket.setsockopt(zmq.LINGER, 0) # All topics
socket.RCVTIMEO = 1000 # timeout: 1 sec
# Connect
socket.connect("tcp://%s:%s" % (host, port))
# Try to receive
try:
socket.recv()
return True
except Exception:
return False
finally:
socket.close()
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'my', b'question']
rep_socket.send_multipart([b'your', b'answer'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REQ)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'my', b'question']),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'your', b'answer']
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REP)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([b'your', b'answer']),
1,
)
def test_tcp_dealer_socket(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'my', b'question']
rep_socket.send_multipart([b'your', b'answer'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.DEALER)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'', b'my', b'question']),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'', b'your', b'answer']
def test_tcp_router_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
req_socket.identity = b'abcd'
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.ROUTER)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
identity = frames.pop(0)
assert identity == req_socket.identity
assert frames == [b'', b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([identity, b'', b'your', b'answer']),
1,
)
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind):
sub_socket = socket_factory.create(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = sub_socket.recv_multipart()
assert frames == [b'a', b'message']
with run_in_background(run) as thread_done_event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XPUB)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'\1a']
while not thread_done_event.is_set():
await socket.send_multipart([b'a', b'message'])
await socket.send_multipart([b'b', b'wrong'])
sub_socket.close()
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'\0a']
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.SUB)
await socket.subscribe(b'a')
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XSUB)
await socket.send_multipart([b'\x01a'])
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind):
pull_socket = socket_factory.create(zmq.PULL)
connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pull_socket.poll(1000) == zmq.POLLIN
message = pull_socket.recv_multipart()
assert message == [b'hello', b'world']
with run_in_background(run) as event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PUSH)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
while not event.is_set():
await asyncio.sleep(0.1)