# ROUTER import time import random import zmq context = zmq.Context.instance() client = context.socket(zmq.ROUTER) client.setsockopt(zmq.TCP_KEEPALIVE,1) client.bind("tcp://127.0.0.1:99999") for _ in range(100): ident = random.choice([b'A', b'A', b'B']) work = b"This is the workload" client.send_multipart([ident, work]) time.sleep(0.5) #CLIENT import zmq context = zmq.Context.instance() worker = context.socket(zmq.DEALER) worker.setsockopt(zmq.IDENTITY, b'A') worker.connect("tcp://127.0.0.1:99999") while True: request = worker.recv() print(request)
我的目标是让多个客户端从 ROUTER 接收数据,由身份(A,B)确定。无论哪个先开始,它都可以正常工作。但是,一旦客户端停止并重新启动,它就不能再重新连接到 ROUTER 或接收数据。这种模式不是为了解决这个问题而设计的,还是有让客户端再次重新连接 ROUTER 的选项?
要解决这个问题,你可以尝试以下方法:
下面是修改后的代码示例:
# 服务器端 import time import random import zmq context = zmq.Context.instance() server = context.socket(zmq.REP) server.setsockopt(zmq.TCP_KEEPALIVE, context = zmq.Context.instance() server = context.socket(zmq.REP 1) server.bind("tcp://127.0.0.1:99999") for _ in range(100): ident = random.choice([ ident = b'A', b'A', b'B']) work = b"This is the workload" server.recv() # 接收客户端的请求 server.send_multipart([ident, work]) time.sleep( server.send_multipart([ 0.5) # 客户端 import zmq context = zmq.Context.instance() client = context.socket(zmq.REQ) client.connect( context = zmq.Context.instance() client = context.socket(zmq context "tcp://127.0.0.1:99999") while True: client.send(b"") # 发送请求到服务器端 reply = client.recv() # 接收服务器端的回复 print(reply)
请注意,REQ-REP模式中,客户端发送一个空的请求(client.send(b"")),然后等待服务器端的回复(client.recv())。这样客户端可以在断开连接后重新连接到服务器端,并接收数据。
client.send(b"")
client.recv()
下面是使用心跳机制的示例代码:
# 服务器端 import time import random import zmq context = zmq.Context.instance() server = context.socket(zmq.ROUTER) server.setsockopt(zmq.TCP_KEEPALIVE, context = zmq.Context.instance() server = context.socket(zmq.ROUTER) server.setsockopt(zmq.TCP_KEEPALIVE context = zmq.Context.instance() server = context.socket(z 1) server.bind("tcp://127.0.0.1:99999") clients = {} # 存储客户端的身份和最后接收心跳消息的时间 while True: poller = zmq.Poller() poller.register(server, zmq.POLLIN) socks = poller = zmq.Poller() poller.register(server dict(poller.poll(timeout=1000)) # 等待1秒钟,检查是否有消息到达 if server in socks and socks[server] == zmq.POLLIN: [ident, _, msg] = server.recv_multipart() clients[ident] = time.time() [ident, _, msg] = server.recv_multipart() clients[ident # 更新客户端的心跳时间 # 处理消息 work = b"This is