Java 类io.reactivex.internal.fuseable.SimplePlainQueue 实例源码
项目:RxJava2Extensions
文件:FlowableExpand.java
void drainQueue() {
if (wip.getAndIncrement() == 0) {
do {
SimplePlainQueue<Publisher<? extends T>> q = queue;
if (isCancelled()) {
q.clear();
} else {
if (!active) {
if (q.isEmpty()) {
setSubscription(SubscriptionHelper.CANCELLED);
super.cancel();
Throwable ex = errors.terminate();
if (ex == null) {
actual.onComplete();
} else {
actual.onError(ex);
}
} else {
Publisher<? extends T> p = q.poll();
long c = produced;
if (c != 0L) {
produced = 0L;
produced(c);
}
active = true;
p.subscribe(this);
}
}
}
} while (wip.decrementAndGet() != 0);
}
}
项目:RxJava2Extensions
文件:FlowableValve.java
void drain() {
if (getAndIncrement() != 0) {
return;
}
int missed = 1;
SimplePlainQueue<T> q = queue;
Subscriber<? super T> a = actual;
AtomicThrowable error = this.error;
for (;;) {
for (;;) {
if (cancelled) {
q.clear();
return;
}
if (error.get() != null) {
Throwable ex = error.terminate();
q.clear();
SubscriptionHelper.cancel(s);
SubscriptionHelper.cancel(other);
a.onError(ex);
return;
}
if (!gate) {
break;
}
boolean d = done;
T v = q.poll();
boolean empty = v == null;
if (d && empty) {
SubscriptionHelper.cancel(other);
a.onComplete();
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}