void drainQueue() { if (wip.getAndIncrement() == 0) { do { SimplePlainQueue<Publisher<? extends T>> q = queue; if (isCancelled()) { q.clear(); } else { if (!active) { if (q.isEmpty()) { setSubscription(SubscriptionHelper.CANCELLED); super.cancel(); Throwable ex = errors.terminate(); if (ex == null) { actual.onComplete(); } else { actual.onError(ex); } } else { Publisher<? extends T> p = q.poll(); long c = produced; if (c != 0L) { produced = 0L; produced(c); } active = true; p.subscribe(this); } } } } while (wip.decrementAndGet() != 0); } }
void drain() { if (getAndIncrement() != 0) { return; } int missed = 1; SimplePlainQueue<T> q = queue; Subscriber<? super T> a = actual; AtomicThrowable error = this.error; for (;;) { for (;;) { if (cancelled) { q.clear(); return; } if (error.get() != null) { Throwable ex = error.terminate(); q.clear(); SubscriptionHelper.cancel(s); SubscriptionHelper.cancel(other); a.onError(ex); return; } if (!gate) { break; } boolean d = done; T v = q.poll(); boolean empty = v == null; if (d && empty) { SubscriptionHelper.cancel(other); a.onComplete(); return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }