@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { if (BackpressureHelper.add(this, n) == 0) { long requested = n; long emitted = 0; do { emitted = requested; while (requested-- > 0 && !cancelled && (count == -1 || counter-- > 0)) { child.onNext(value); } } while ((requested = this.addAndGet(-emitted)) > 0); if (count >= 0 && !cancelled) { child.onComplete(); } } } }
@Override public void request(long n) { debug(this + " request " + n); if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); while (true) { Subscription p = parent.get(); long d = deferredRequests.get(); if (d == -1) { // parent exists so can request of it debug(this + " requesting from parent " + n); p.request(n); break; } else { long d2 = d + n; if (d2 < 0) { d2 = Long.MAX_VALUE; } if (deferredRequests.compareAndSet(d, d2)) { break; } } } drain(); } }
@Override public void request(long n) { if (!options.sameThread()) { if(executorService == null) { executorService = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable); thread.setDaemon(true); thread.setName("Subscription Runner [" + System.currentTimeMillis() + "]"); return thread; }); executorService.submit(() -> { while (true) { if (drain()) { return; } Thread.yield(); } }); } BackpressureHelper.add(requested, n); } else { if (n > 0) { if (BackpressureHelper.add(requested, n) == 0) { drain(); } } } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { if (BackpressureHelper.add(this, n) == 0) { if (n == Long.MAX_VALUE) { fastPath(); } else { slowPath(n); } } } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); drain(); } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); Subscription a = s.get(); if (a != null) { a.request(n); } } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { long u = BackpressureHelper.multiplyCap(n, keep); s.request(u); } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { if (BackpressureHelper.add(this, n) == 0) { if (n == Long.MAX_VALUE) { fastpath(); } else { slowpath(n); } } } }
@Override public void request(long n) { if (mode == Mode.BEFORE && SubscriptionHelper.validate(n)) { BackpressureHelper.add(requestedWindows, n); drain(); } s.request(n); }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); if (once.compareAndSet(false, true)) { if (n == Long.MAX_VALUE) { parent.request(Long.MAX_VALUE); unbounded = true; } else { parent.request(1); } } drain(); } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); requestMore(); } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); parent.request(n); scheduleDrain(); } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); scheduleDrain(); } }
@Override public void request(long n) { debug(this + " request " + n); if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); while (true) { Requests<T> r = requests.get(); Requests<T> r2; if (r.parent == null) { long d = r.deferred + n; if (d < 0) { d = Long.MAX_VALUE; } r2 = new Requests<T>(r.parent, r.unreconciled, d, r.child); if (requests.compareAndSet(r, r2)) { break; } } else { long x = n + r.deferred - r.unreconciled; long u = Math.max(0, -x); r2 = new Requests<T>(r.parent, u, 0, r.child); if (requests.compareAndSet(r, r2)) { if (x > 0) { r.parent.request(x); } break; } } } drain(); } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); parent.request(n); drain(); } }
@Override public void cancel() { if (!cancelled) { cancelled = true; if (BackpressureHelper.add(this, 1) == 0) { releaseRest(items, release); } } }
@Override public void cancel() { if (!cancelled) { cancelled = true; if (BackpressureHelper.add(this, 1) == 0) { T[] a = items; int n = a.length; Consumer<? super T> r = release; for (int i = index; i < n; i++) { releaseItem(a[i], r); } } } }
@Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); schedule(); } }
@Override protected void subscribeActual(final Subscriber<? super T> subscriber) { subscriber.onSubscribe(new Subscription() { final Queue<T> q = new ConcurrentLinkedQueue<T>(items); final AtomicLong requested = new AtomicLong(); volatile boolean cancelled; @Override public void request(long n) { if (cancelled) { // required by reactive-streams-jvm 3.6 return; } if (SubscriptionHelper.validate(n)) { // just for testing, don't care about perf // so no attempt made to reduce volatile reads if (BackpressureHelper.add(requested, n) == 0) { if (q.isEmpty()) { return; } while (!q.isEmpty() && requested.get() > 0) { T item = q.poll(); requested.decrementAndGet(); subscriber.onNext(item); } if (q.isEmpty()) { if (error != null) { subscriber.onError(error); } else { subscriber.onComplete(); } } } } } @Override public void cancel() { cancelled = true; } }); }