Java 类io.reactivex.internal.util.BackpressureHelper 实例源码
项目:rxjava2-extras
文件:FlowableRepeat.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0) {
long requested = n;
long emitted = 0;
do {
emitted = requested;
while (requested-- > 0 && !cancelled && (count == -1 || counter-- > 0)) {
child.onNext(value);
}
} while ((requested = this.addAndGet(-emitted)) > 0);
if (count >= 0 && !cancelled) {
child.onComplete();
}
}
}
}
项目:rxjava2-extras
文件:FlowableRepeatingTransform.java
@Override
public void request(long n) {
debug(this + " request " + n);
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
while (true) {
Subscription p = parent.get();
long d = deferredRequests.get();
if (d == -1) {
// parent exists so can request of it
debug(this + " requesting from parent " + n);
p.request(n);
break;
} else {
long d2 = d + n;
if (d2 < 0) {
d2 = Long.MAX_VALUE;
}
if (deferredRequests.compareAndSet(d, d2)) {
break;
}
}
}
drain();
}
}
项目:reactivejournal
文件:ReactivePlayer.java
@Override
public void request(long n) {
if (!options.sameThread()) {
if(executorService == null) {
executorService = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("Subscription Runner [" + System.currentTimeMillis() + "]");
return thread;
});
executorService.submit(() -> {
while (true) {
if (drain()) {
return;
}
Thread.yield();
}
});
}
BackpressureHelper.add(requested, n);
} else {
if (n > 0) {
if (BackpressureHelper.add(requested, n) == 0) {
drain();
}
}
}
}
项目:RxJava2Extensions
文件:FlowableCharSequence.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastPath();
} else {
slowPath(n);
}
}
}
}
项目:RxJava2Extensions
文件:FlowableSplit.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:RxJava2Extensions
文件:FlowableSwitchIfEmptyMany.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
Subscription a = s.get();
if (a != null) {
a.request(n);
}
}
}
项目:RxJava2Extensions
文件:FlowableOnBackpressureTimeout.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:RxJava2Extensions
文件:FlowableEvery.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
long u = BackpressureHelper.multiplyCap(n, keep);
s.request(u);
}
}
项目:RxJava2Extensions
文件:FlowableIntervalBackpressure.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:RxJava2Extensions
文件:FlowableSwitchIfEmptyManyArray.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
Subscription a = s.get();
if (a != null) {
a.request(n);
}
}
}
项目:RxJava2Extensions
文件:FlowableRepeatCallable.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastpath();
} else {
slowpath(n);
}
}
}
}
项目:RxJava2Extensions
文件:FlowableRepeatCallable.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastpath();
} else {
slowpath(n);
}
}
}
}
项目:RxJava2Extensions
文件:FlowableRepeatScalar.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastpath();
} else {
slowpath(n);
}
}
}
}
项目:RxJava2Extensions
文件:FlowableRepeatScalar.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastpath();
} else {
slowpath(n);
}
}
}
}
项目:RxJava2Extensions
文件:BasicMergeSubscription.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:RxJava2Extensions
文件:FlowableCoalesce.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:RxJava2Extensions
文件:FlowableWindowPredicate.java
@Override
public void request(long n) {
if (mode == Mode.BEFORE && SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requestedWindows, n);
drain();
}
s.request(n);
}
项目:rxjava2-extras
文件:FlowableStringSplitSimple.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
if (once.compareAndSet(false, true)) {
if (n == Long.MAX_VALUE) {
parent.request(Long.MAX_VALUE);
unbounded = true;
} else {
parent.request(1);
}
}
drain();
}
}
项目:rxjava2-extras
文件:FlowableMinRequest.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:rxjava2-extras
文件:FlowableMaxRequest.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
requestMore();
}
}
项目:rxjava2-extras
文件:FlowableMatch.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:rxjava2-extras
文件:FlowableOnBackpressureBufferToFile.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
parent.request(n);
scheduleDrain();
}
}
项目:rxjava2-extras
文件:FlowableOnBackpressureBufferToFile.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
scheduleDrain();
}
}
项目:rxjava2-extras
文件:FlowableRepeatingTransform.java
@Override
public void request(long n) {
debug(this + " request " + n);
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
while (true) {
Requests<T> r = requests.get();
Requests<T> r2;
if (r.parent == null) {
long d = r.deferred + n;
if (d < 0) {
d = Long.MAX_VALUE;
}
r2 = new Requests<T>(r.parent, r.unreconciled, d, r.child);
if (requests.compareAndSet(r, r2)) {
break;
}
} else {
long x = n + r.deferred - r.unreconciled;
long u = Math.max(0, -x);
r2 = new Requests<T>(r.parent, u, 0, r.child);
if (requests.compareAndSet(r, r2)) {
if (x > 0) {
r.parent.request(x);
}
break;
}
}
}
drain();
}
}
项目:rxjava2-extras
文件:FlowableStateMachine.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:rxjava2-extras
文件:FlowableCollectWhile.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
parent.request(n);
drain();
}
}
项目:akarnokd-misc
文件:SingleFlatMapIterableFlowable.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
项目:akarnokd-misc
文件:ResourceFlowableIterable.java
@Override
public void cancel() {
if (!cancelled) {
cancelled = true;
if (BackpressureHelper.add(this, 1) == 0) {
releaseRest(items, release);
}
}
}
项目:akarnokd-misc
文件:ResourceFlowableArray.java
@Override
public void cancel() {
if (!cancelled) {
cancelled = true;
if (BackpressureHelper.add(this, 1) == 0) {
T[] a = items;
int n = a.length;
Consumer<? super T> r = release;
for (int i = index; i < n; i++) {
releaseItem(a[i], r);
}
}
}
}
项目:akarnokd-misc
文件:ResourceFlowableObserveOn.java
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
schedule();
}
}
项目:rxjava2-extras
文件:Burst.java
@Override
protected void subscribeActual(final Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new Subscription() {
final Queue<T> q = new ConcurrentLinkedQueue<T>(items);
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;
@Override
public void request(long n) {
if (cancelled) {
// required by reactive-streams-jvm 3.6
return;
}
if (SubscriptionHelper.validate(n)) {
// just for testing, don't care about perf
// so no attempt made to reduce volatile reads
if (BackpressureHelper.add(requested, n) == 0) {
if (q.isEmpty()) {
return;
}
while (!q.isEmpty() && requested.get() > 0) {
T item = q.poll();
requested.decrementAndGet();
subscriber.onNext(item);
}
if (q.isEmpty()) {
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onComplete();
}
}
}
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}