一尘不染

如何要求RabbitMQ在Spring异步MessageListener用例中发生业务异常时重试

java

我有一个Spring AMQP消息监听器正在运行。

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}

如您所见,在处理过程中可能会出现异常。我想重试,因为Catch块中有特定错误。我无法通过onMessage中的异常。如何告诉RabbitMQ有异常并重试?


阅读 360

收藏
2020-12-03

共1个答案

一尘不染

由于onMessage()不允许抛出已检查的异常,因此可以将异常包装在中RuntimeException并重新抛出。

try {
    testService.process(message);
} catch (BusinessException e) {
    throw new RuntimeException(e);
}

但是请注意,这可能导致消息无限期地重新发送。这是这样的:

RabbitMQ支持拒绝消息并要求代理重新排队。这显示在这里。但是RabbitMQ本身没有重试策略的机制,例如设置最大重试次数,延迟等。

使用Spring
AMQP时,“拒绝时重新排队”是默认选项。SimpleMessageListenerContainer如果有未处理的异常,Spring
将默认执行此操作。因此,在您的情况下,您只需要重新引发捕获的异常即可。但是请注意,如果您无法处理消息并且总是抛出异常,则它将无限期地重新发送,并导致无限循环。

您可以通过引发AmqpRejectAndDontRequeueException异常来覆盖每个消息的此行为,在这种情况下,不会重新排队该消息。

您也可以SimpleMessageListenerContainer通过设置完全关闭“拒绝时重新排队”行为

container.setDefaultRequeueRejected(false)

如果在RabbitMQ中设置了一条消息,则该消息被拒绝但不重新排队将丢失或转移到DLQ。

如果您需要具有最大尝试次数,延迟次数等的重试策略,最简单的方法是设置一个弹簧“无状态”
RetryOperationsInterceptor,它将在线程内进行所有重试(使用Thread.sleep()),而不会拒绝每次重试的消息(因此无需为每个重试返回RabbitMQ重试)。重试用尽时,默认情况下将记录一条警告,并且该消息将被使用。如果要发送到DLQ,则需要一个RepublishMessageRecovererMessageRecoverer不接受消息而无需重新排队的自定义(在后一种情况下,您还应该在队列上设置
RabbitMQ DLQ)。默认消息恢复程序的示例:

container.setAdviceChain(new Advice[] {
        org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                .stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 5000)
                .build()
});

显然,这样做的缺点是您将在重试的整个过程中占用线程。您还可以选择使用“有状态”
RetryOperationsInterceptor,该状态将在每次重试时将消息发送回RabbitMQ,但是延迟仍将Thread.sleep()在应用程序内实现,此外,设置有状态拦截器会更加复杂。

因此,如果您希望在不占用任何延迟的情况下重试,Thread则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案。如果您不希望出现指数退缩(因此每次重试不会增加延迟),则要简单一些。要实现这种解决方案,您基本上可以在RabbitMQ上创建另一个带有参数的队列:"x-message- ttl": <delay time in milliseconds>"x-dead-letter-exchange":"<name of the original queue>"。然后在您设置的主队列上"x-dead-letter-exchange":"<name of the queue with the TTL>"。因此,现在当您拒绝但不重新排队时,RabbitMQ会将其重定向到第二个队列。TTL过期时,它将被重定向到原始队列,然后重新传递到应用程序。因此,现在您需要一个重试拦截器,该拦截器在每次失败后拒绝发送给RabbitMQ的消息,并跟踪重试计数。为了避免需要在应用程序中保留状态(因为如果您的应用程序是集群的,则需要复制状态),则可以根据x-deathRabbitMQ设置的标头计算重试计数。在此处查看有关此标头的更多信息。因此,在这一点上,实现自定义拦截器比通过这种行为自定义Spring有状态拦截器要容易得多。

另请参阅Spring AMQP参考中有关重试的部分

2020-12-03