public TracingConsumer(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe, String operationName, Tracer tracer) { super(operationName, tracer); requireNonNull(onNext, "onNext can not be null"); requireNonNull(onError, "onError can not be null"); requireNonNull(onComplete, "onComplete can not be null"); requireNonNull(onSubscribe, "onSubscribe can not be null"); requireNonNull(tracer, "tracer can not be null"); lambdaObserver = new LambdaObserver<>(onNext, onError, onComplete, onSubscribe); }
/** * @see Observable#subscribe(Consumer, Consumer, Action, Consumer) */ public <T> Observer<? super T> wrap(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { LambdaObserver<? super T> disposableObserver = new LambdaObserver<>( onNext, onError, onComplete, onSubscribe ); return handleObserver(disposableObserver); }
/** * @see Observable#subscribe(Observer) */ public <T> Observer<? super T> wrap(final Observer<? super T> observer) { LambdaObserver<? super T> disposableObserver = new LambdaObserver<>( new Consumer<T>() { @Override public void accept(T t) throws Exception { observer.onNext(t); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { observer.onError(throwable); } }, new Action() { @Override public void run() throws Exception { observer.onComplete(); } }, new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { observer.onSubscribe(disposable); } } ); return handleObserver(disposableObserver); }
<T> Observer<? super T> handleObserver(LambdaObserver<? super T> disposableObserver) { delegateDisposable(disposableObserver); return disposableObserver; }