Java 类io.reactivex.internal.fuseable.SimplePlainQueue 实例源码

项目:RxJava2Extensions    文件:FlowableExpand.java   
void drainQueue() {
    if (wip.getAndIncrement() == 0) {
        do {
            SimplePlainQueue<Publisher<? extends T>> q = queue;
            if (isCancelled()) {
                q.clear();
            } else {
                if (!active) {
                    if (q.isEmpty()) {
                        setSubscription(SubscriptionHelper.CANCELLED);
                        super.cancel();
                        Throwable ex = errors.terminate();
                        if (ex == null) {
                            actual.onComplete();
                        } else {
                            actual.onError(ex);
                        }
                    } else {
                        Publisher<? extends T> p = q.poll();
                        long c = produced;
                        if (c != 0L) {
                            produced = 0L;
                            produced(c);
                        }
                        active = true;
                        p.subscribe(this);
                    }
                }
            }
        } while (wip.decrementAndGet() != 0);
    }
}
项目:RxJava2Extensions    文件:FlowableValve.java   
void drain() {
    if (getAndIncrement() != 0) {
        return;
    }

    int missed = 1;

    SimplePlainQueue<T> q = queue;
    Subscriber<? super T> a = actual;
    AtomicThrowable error = this.error;

    for (;;) {
        for (;;) {
            if (cancelled) {
                q.clear();
                return;
            }

            if (error.get() != null) {
                Throwable ex = error.terminate();
                q.clear();
                SubscriptionHelper.cancel(s);
                SubscriptionHelper.cancel(other);
                a.onError(ex);
                return;
            }

            if (!gate) {
                break;
            }

            boolean d = done;
            T v = q.poll();
            boolean empty = v == null;

            if (d && empty) {
                SubscriptionHelper.cancel(other);
                a.onComplete();
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}