一尘不染

SQL Server服务代理

sql

当前,我们正在使用服务代理来回发送消息,这很好。但是我们想使用RELATED_CONVERSATION_GROUP对这些消息进行分组。我们想使用我们自己的数据库持久化uuid作为数据库中的RELATED_CONVERSATION_GROUP
= @uuid,但是即使每次每次接收队列时conversion_group_id都不同时,我们都使用相同的uuid。

你们知道我创建经纪人或接听电话的方式有什么问题吗,我在下面提供了经纪人创建代码和接听电话代码。谢谢

下面是代码“ Service Broker创建代码”

CREATE PROCEDURE dbo.OnDataInserted

@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS

BEGIN

SET NOCOUNT ON;

 DECLARE @conversation UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;

SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))

下面是代码“接收代码”

WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON

BEGIN TRANSACTION;

DECLARE 
@cID as uniqueidentifier, 
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)

RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT

;WAITFOR (RECEIVE TOP (1) 
@cID = Substring(CAST(message_body as nvarchar(max)),4,36), 
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)

RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID; 
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp  <> ''
BEGIN
    MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;

RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
    else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);

RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END

WAITFOR DELAY '000:00:10'

COMMIT

RAISERROR ('Committed' , 16, 1) WITH NOWAIT

细化

我们的情况是,我们需要循环接收来自此Service
Broker队列的项目,阻止WAITFOR,然后将它们通过不可靠的网络传递给另一个系统。从队列接收的项目将发送到与该远程系统的许多连接之一。如果该项目未成功交付给另一个系统,则应回退该单个项目的事务,并将该项目返回到队列。我们在成功传递后提交事务,从而解锁消息序列,以供后续循环迭代拾取。

相关项目序列中的延迟不应影响无关序列的传递。单个项目一经可用就立即发送到队列中,并立即转发。项目应该以单文件转发,尽管即使在序列内的交付顺序也不是严格重要的。

从一次接收一条消息的循环中,从打开的连接列表中选择一个新的或现有的TcpClient,然后通过异步IO回调链传递消息和打开的连接,直到传输完成。然后,我们完成了数据库事务处理,在该事务处理中,我们从Service
Broker队列中收到了项目。

在这种情况下,如何使用Service Broker和对话组来提供帮助?


阅读 159

收藏
2021-05-05

共1个答案

一尘不染

会话组是 本地的
仅用于锁定的概念:专门用于锁定:相关的对话属于一个组,因此当您在一个对话中处理消息时,另一个线程无法处理相关的消息。没有有关两个端点交换的对话组的信息,因此在您的示例中,所有发起者端点最终都属于一个对话组,但是目标端点每个都是一个不同的对话组(每个组只有一个对话)。系统之所以会这样,是因为对话组旨在解决诸如旅行预订服务之类的问题:当其收到“预订旅行”的消息时,必须预订航班,旅馆和汽车租金。它必须发送三则消息,其中每一项都发送给每个服务(“航班”,“酒店”,“汽车”
),然后响应将异步返回。当它们返回时,处理过程必须确保它们不会被单独的线程同时处理,每个线程都将尝试更新“行程”记录状态。在消息传递中,此问题被称为“消息关联问题”。

但是,对话组通常仅出于性能原因而部署在SSB中:它们允许更大的RECEIVE结果。可以使用将目标端点一起移动到一个组中,MOVE CONVERSATION但实际上有一个简单得多的技巧:反转对话的方向。让您的 目的地 开始对话(分组),然后
目的地 发起的对话中发送其“更新”。

一些注意事项:

  • 不要使用BEGIN / SEND / END的一劳永逸模式。您将来将无法诊断任何问题,请参阅《火灾与遗忘:有益于军方,但不适用于Service Broker对话》
  • 绝对不要在生产代码中使用WITH CLEANUP。它旨在用于诸如灾难恢复之类的管理后备操作。如果滥用它,您将拒绝SSB任何机会来正确跟踪邮件以进行正确的重试传递(如果邮件由于任何原因在目标上弹回,它将永远丢失)。
  • SSB不能保证整个对话的顺序,只能保证一次对话。对于每个INSERT事件,开始新的对话并不能保证按目标保留插入操作的顺序。
2021-05-05