一尘不染

编写一个Rx“ RetryAfter”扩展方法

c#

IntroToRx书中,作者建议为I
/ O写一个“智能”重试,在一段时间后重试I / O请求(如网络请求)。

这是确切的段落:

添加到您自己的库中的一种有用的扩展方法可能是“ Back Off and Retry”方法。与我合作的团队发现,这种功能在执行I /
O(尤其是网络请求)时非常有用。概念是尝试,并在失败时等待给定的时间,然后重试。您使用的此方法的版本可能会考虑要重试的Exception的类型以及重试的最大次数。您甚至可能希望延长等待时间,以减少后续每次重试时的积极性。

不幸的是,我不知道如何编写此方法。:(


阅读 265

收藏
2020-05-19

共1个答案

一尘不染

延迟重试实现的关键是可观察到的延迟。延迟的可观察对象在有人订阅之前不会执行其工厂。并且它将为每个订阅调用工厂,使其非常适合我们的重试方案。

假设我们有一个触发网络请求的方法。

public IObservable<WebResponse> SomeApiMethod() { ... }

出于这个小片段的目的,让我们将延期定义为 source

var source = Observable.Defer(() => SomeApiMethod());

每当有人订阅源时,它将调用SomeApiMethod并启动新的Web请求。失败时重试的天真的方法是使用内置的Retry运算符。

source.Retry(4)

但这对API来说并不是很好,这也不是您要的。我们需要在每次尝试之间延迟请求的启动。一种方法是使用延迟订阅

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

这是不理想的,因为即使在第一个请求上也会增加延迟,让我们修复一下。

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

虽然只是暂停一秒钟并不是一个很好的重试方法,所以让我们将该常量更改为一个函数,该函数可以接收重试计数并返回适当的延迟。指数补偿很容易实现。

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))

现在我们差不多完成了,我们只需要添加一种指定应针对哪些异常重试的方法。让我们添加一个给定异常的函数,该函数返回重试是否有意义,我们将其称为retryOnError。

现在,我们需要编写一些吓人的代码,但请耐心等待。

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))

所有这些尖括号都在封送一个例外,我们不应为此重试.Retry()。我们已将内部可观察对象设为IObservable<Tuple<bool, WebResponse, Exception>>第一个布尔值,指示我们是否有响应或异常。如果retryOnError指示我们应针对特定异常进行重试,则内部可观察对象将抛出,并由重试获取。SelectMany只是解开我们的元组,并使结果可观察到IObservable<WebRequest>

请参阅我的要点,以获取完整版本最终版本的测试。有了这个运算符,我们可以很简洁地编写重试代码

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )
2020-05-19