一尘不染

如何查看我的反应式扩展查询正在做什么?

c#

我正在编写一个包含许多运算符的复杂的Reactive Extensions查询。我怎么看发生了什么事?

我问并回答了这个问题,因为它的确相当普遍,并且可能具有很好的通用性。


阅读 229

收藏
2020-05-19

共1个答案

一尘不染

您可以在开发Rx运算符时自由地将此函数附加到Rx运算符以查看发生了什么:

    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
    {
        opName = opName ?? "IObservable";
        Console.WriteLine("{0}: Observable obtained on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId);

        return Observable.Create<T>(obs =>
        {
            Console.WriteLine("{0}: Subscribed to on Thread: {1}",
                              opName,
                              Thread.CurrentThread.ManagedThreadId);

            try
            {
                var subscription = source
                    .Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",
                                                opName,
                                                x,
                                                Thread.CurrentThread.ManagedThreadId),
                        ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",
                                                 opName,
                                                 ex,
                                                 Thread.CurrentThread.ManagedThreadId),
                        () => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",
                                                 opName,
                                                 Thread.CurrentThread.ManagedThreadId)
                    )
                    .Subscribe(obs);
                return new CompositeDisposable(
                    subscription,
                    Disposable.Create(() => Console.WriteLine(
                          "{0}: Cleaned up on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId)));
            }
            finally
            {
                Console.WriteLine("{0}: Subscription completed.", opName);
            }
        });
    }

这是一个示例用法,显示了细微的行为差异Range

Observable.Range(0, 1).Spy("Range").Subscribe();

给出输出:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7

但是这个:

Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe();

给出输出:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Subscription completed.
Range: Cleaned up on Thread: 7

指出不同?

显然,您可以更改此设置以写入日志或调试,或者使用预处理程序指令在Release版本等上进行精益传递订阅…

您可以Spy在整个运营商链中进行申请。例如:

Observable.Range(0,3).Spy("Range")
          .Scan((acc, i) => acc + i).Spy("Scan").Subscribe();

给出输出:

Range: Observable obtained on Thread: 7
Scan: Observable obtained on Thread: 7
Scan: Subscribed to on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Scan: Subscription completed.
Range: OnNext(1) on Thread: 7
Scan: OnNext(1) on Thread: 7
Range: OnNext(2) on Thread: 7
Scan: OnNext(3) on Thread: 7
Range: OnCompleted() on Thread: 7
Scan: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7
Scan: Cleaned up on Thread: 7

我相信您可以找到各种方法来满足您的目的。

2020-05-19