当前,我们正在使用服务代理来回发送消息,这很好。但是我们想使用RELATED_CONVERSATION_GROUP对这些消息进行分组。我们想使用我们自己的数据库持久化uuid作为数据库中的RELATED_CONVERSATION_GROUP = @uuid,但是即使每次每次接收队列时conversion_group_id都不同时,我们都使用相同的uuid。
你们知道我创建经纪人或接听电话的方式有什么问题吗,我在下面提供了经纪人创建代码和接听电话代码。谢谢
下面是代码“ Service Broker创建代码”
CREATE PROCEDURE dbo.OnDataInserted @EntityType NVARCHAR(100), @MessageID BIGINT, @uuid uniqueidentifier, @message_body nvarchar(max) AS BEGIN SET NOCOUNT ON; DECLARE @conversation UNIQUEIDENTIFIER BEGIN DIALOG CONVERSATION @conversation FROM SERVICE DataInsertSndService TO SERVICE 'DataInsertRcvService' ON CONTRACT DataInsertContract WITH RELATED_CONVERSATION_GROUP = @uuid; SEND ON CONVERSATION @conversation MESSAGE TYPE DataInserted (CAST(@message_body))
下面是代码“接收代码”
WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON BEGIN TRANSACTION; DECLARE @cID as uniqueidentifier, @conversationHandle as uniqueidentifier, @conversationGroupId as uniqueidentifier, @tempConversationGroupId as uniqueidentifier, @message_body VARBINARY(MAX) RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT ;WAITFOR (RECEIVE TOP (1) @cID = Substring(CAST(message_body as nvarchar(max)),4,36), @conversationHandle = [conversation_handle], @conversationGroupId = [conversation_group_id], @message_body = message_body FROM DataInsertRcvQueue) RAISERROR ('Message Received', 16, 1) WITH NOWAIT Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID; declare @temp as nvarchar(max); Set @temp = CAST(@tempConversationGroupId as nvarchar(max)); if @temp <> '' BEGIN MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId; RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT END else BEGIN insert into ConversationGroupMapper values (@cID,@conversationGroupId); RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT END WAITFOR DELAY '000:00:10' COMMIT RAISERROR ('Committed' , 16, 1) WITH NOWAIT
细化
我们的情况是,我们需要循环接收来自此Service Broker队列的项目,阻止WAITFOR,然后将它们通过不可靠的网络传递给另一个系统。从队列接收的项目将发送到与该远程系统的许多连接之一。如果该项目未成功交付给另一个系统,则应回退该单个项目的事务,并将该项目返回到队列。我们在成功传递后提交事务,从而解锁消息序列,以供后续循环迭代拾取。
相关项目序列中的延迟不应影响无关序列的传递。单个项目一经可用就立即发送到队列中,并立即转发。项目应该以单文件转发,尽管即使在序列内的交付顺序也不是严格重要的。
从一次接收一条消息的循环中,从打开的连接列表中选择一个新的或现有的TcpClient,然后通过异步IO回调链传递消息和打开的连接,直到传输完成。然后,我们完成了数据库事务处理,在该事务处理中,我们从Service Broker队列中收到了项目。
在这种情况下,如何使用Service Broker和对话组来提供帮助?
会话组是 本地的 仅用于锁定的概念:专门用于锁定:相关的对话属于一个组,因此当您在一个对话中处理消息时,另一个线程无法处理相关的消息。没有有关两个端点交换的对话组的信息,因此在您的示例中,所有发起者端点最终都属于一个对话组,但是目标端点每个都是一个不同的对话组(每个组只有一个对话)。系统之所以会这样,是因为对话组旨在解决诸如旅行预订服务之类的问题:当其收到“预订旅行”的消息时,必须预订航班,旅馆和汽车租金。它必须发送三则消息,其中每一项都发送给每个服务(“航班”,“酒店”,“汽车” ),然后响应将异步返回。当它们返回时,处理过程必须确保它们不会被单独的线程同时处理,每个线程都将尝试更新“行程”记录状态。在消息传递中,此问题被称为“消息关联问题”。
但是,对话组通常仅出于性能原因而部署在SSB中:它们允许更大的RECEIVE结果。可以使用将目标端点一起移动到一个组中,MOVE CONVERSATION但实际上有一个简单得多的技巧:反转对话的方向。让您的 目的地 开始对话(分组),然后 源 在 目的地 发起的对话中发送其“更新”。
MOVE CONVERSATION
一些注意事项: