小能豆

如何在 ZMQ 中重新建立 ROUTER/DEALER 连接?

python

# ROUTER
import time
import random
import zmq

context = zmq.Context.instance()
client = context.socket(zmq.ROUTER)
client.setsockopt(zmq.TCP_KEEPALIVE,1)
client.bind("tcp://127.0.0.1:99999")

for _ in range(100):
    ident = random.choice([b'A', b'A', b'B'])           
    work = b"This is the workload"
    client.send_multipart([ident, work])

    time.sleep(0.5)
#CLIENT
import zmq

context =  zmq.Context.instance()
worker = context.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, b'A')
worker.connect("tcp://127.0.0.1:99999")

while True:
    request = worker.recv()
    print(request)

我的目标是让多个客户端从 ROUTER 接收数据,由身份(A,B)确定。无论哪个先开始,它都可以正常工作。但是,一旦客户端停止并重新启动,它就不能再重新连接到 ROUTER 或接收数据。这种模式不是为了解决这个问题而设计的,还是有让客户端再次重新连接 ROUTER 的选项?


阅读 150

收藏
2023-06-19

共1个答案

小能豆

要解决这个问题,你可以尝试以下方法:

  1. 使用REQ-REP模式:将服务器端的ROUTER替换为REP套接字,将客户端的DEALER替换为REQ套接字。这种模式下,客户端可以断开连接并重新连接到服务器端,而服务器端可以接受新的连接并回复客户端的请求。

下面是修改后的代码示例:

# 服务器端
import time
import random
import zmq

context = zmq.Context.instance()
server = context.socket(zmq.REP)
server.setsockopt(zmq.TCP_KEEPALIVE, 

context = zmq.Context.instance()
server = context.socket(zmq.REP
1)
server.bind("tcp://127.0.0.1:99999")

for _ in range(100):
    ident = random.choice([
    ident =
b'A', b'A', b'B'])           
    work = b"This is the workload"
    server.recv()  # 接收客户端的请求
    server.send_multipart([ident, work])

    time.sleep(
    server.send_multipart([
0.5)

# 客户端
import zmq

context =  zmq.Context.instance()
client = context.socket(zmq.REQ)
client.connect(

context =  zmq.Context.instance()
client = context.socket(zmq


context
"tcp://127.0.0.1:99999")

while True:
    client.send(b"")  # 发送请求到服务器端
    reply = client.recv()  # 接收服务器端的回复
    print(reply)

请注意,REQ-REP模式中,客户端发送一个空的请求(client.send(b"")),然后等待服务器端的回复(client.recv())。这样客户端可以在断开连接后重新连接到服务器端,并接收数据。

  1. 使用心跳机制:在客户端中实现一个心跳机制,定期发送心跳消息给服务器端,以保持连接的活跃状态。服务器端可以检测到客户端的心跳消息,如果在一定时间内没有收到心跳消息,则认为客户端已断开连接,可以采取相应的处理。

下面是使用心跳机制的示例代码:

# 服务器端
import time
import random
import zmq

context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)
server.setsockopt(zmq.TCP_KEEPALIVE, 

context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)
server.setsockopt(zmq.TCP_KEEPALIVE


context = zmq.Context.instance()
server = context.socket(z
1)
server.bind("tcp://127.0.0.1:99999")

clients = {}  # 存储客户端的身份和最后接收心跳消息的时间

while True:
    poller = zmq.Poller()
    poller.register(server, zmq.POLLIN)

    socks = 
    poller = zmq.Poller()
    poller.register(server
dict(poller.poll(timeout=1000))  # 等待1秒钟,检查是否有消息到达

    if server in socks and socks[server] == zmq.POLLIN:
        [ident, _, msg] = server.recv_multipart()
        clients[ident] = time.time()  
        [ident, _, msg] = server.recv_multipart()
        clients[ident
# 更新客户端的心跳时间

        # 处理消息
        work = b"This is
2023-06-19