Java 类java.util.concurrent.Flow 实例源码
项目:Reactive4JavaFlow
文件:SubscriptionArbiterTest.java
@Test
public void cancelRequests() {
SubscriptionArbiter sa = new SubscriptionArbiter();
sa.arbiterReplace(new Flow.Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
sa.request(1);
}
});
sa.cancel();
}
项目:Reactive4JavaFlow
文件:FolyamSwitchFlatMapTest.java
@Test
public void mapperThrows() {
DirectProcessor<Integer> bp = new DirectProcessor<>();
TestConsumer<Object> tc = bp
.switchFlatMap((CheckedFunction<Object, Flow.Publisher<Object>>) v -> {
throw new IOException();
}, 2)
.test();
bp.onNext(1);
tc.assertFailure(IOException.class);
assertFalse(bp.hasSubscribers());
}
项目:Reactive4JavaFlow
文件:ParallelPeek.java
@Override
public void onSubscribe(Flow.Subscription s) {
this.s = s;
try {
parent.onSubscribe.accept(s);
} catch (Throwable ex) {
FolyamPlugins.handleFatal(ex);
s.cancel();
actual.onSubscribe(EmptySubscription.INSTANCE);
onError(ex);
return;
}
actual.onSubscribe(this);
}
项目:Reactive4JavaFlow
文件:FolyamOrderedMergeIterable.java
@SuppressWarnings("unchecked")
@Override
protected void subscribeActual(FolyamSubscriber<? super T> s) {
Flow.Publisher<? extends T>[] array = new Flow.Publisher[8];
int n = 0;
try {
for (Flow.Publisher<? extends T> p : sourcesIterable) {
if (n == array.length) {
array = Arrays.copyOf(array, n << 1);
}
array[n++] = Objects.requireNonNull(p, "a source is null");
}
} catch (Throwable ex) {
FolyamPlugins.handleFatal(ex);
EmptySubscription.error(s, ex);
return;
}
FolyamOrderedMergeArray.subscribe(s, array, n, comparator, prefetch, delayErrors);
}
项目:Reactive4JavaFlow
文件:AsyncProcessorAsPublisherTckTest.java
@Override
public Flow.Publisher<Integer> createFlowPublisher(final long elements) {
final LastProcessor<Integer> pp = new LastProcessor<>();
SchedulerServices.io().schedule(() -> {
long start = System.currentTimeMillis();
while (!pp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
}
if (System.currentTimeMillis() - start > 200) {
return;
}
}
for (int i = 0; i < elements; i++) {
pp.onNext(i);
}
pp.onComplete();
});
return pp;
}
项目:Reactive4JavaFlow
文件:FolyamOnErrorResumeNext.java
@Override
public void onError(Throwable throwable) {
if (once) {
actual.onError(throwable);
return;
}
once = true;
Flow.Publisher<? extends T> p;
try {
p = Objects.requireNonNull(handler.apply(throwable), "The handler returned a null Flow.Publisher");
} catch (Throwable ex) {
actual.onError(new CompositeThrowable(throwable, ex));
return;
}
long c = produced;
if (c != 0) {
arbiterProduced(c);
}
p.subscribe(this);
}
项目:Reactive4JavaFlow
文件:FolyamTimeoutSelectorFallback.java
public final void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
arbiterReplace(subscription);
if (firstTimeout != null) {
AtomicDisposable d = new AtomicDisposable();
task = d;
onStart();
if (d.get() == null) {
TimeoutInnerSubscriber inner = new TimeoutInnerSubscriber(this, 0);
if (d.compareAndSet(null, inner)) {
firstTimeout.subscribe(inner);
}
}
} else {
onStart();
}
}
项目:Reactive4JavaFlow
文件:FolyamTimeoutSelectorFallback.java
public final void onNext(T item) {
AutoDisposable d = task;
if (d != null) {
d.close();
}
long idx = (long)INDEX.getAcquire(this);
if (idx != Long.MIN_VALUE && INDEX.compareAndSet(this, idx, idx + 1)) {
next(item);
Flow.Publisher<?> p;
try {
p = Objects.requireNonNull(itemTimeoutSelector.apply(item), "The itemTimeoutSelector returned a null Flow.Publisher");
} catch (Throwable ex) {
onError(ex);
return;
}
// replace
TimeoutInnerSubscriber inner = new TimeoutInnerSubscriber(this, idx + 1);
if (DisposableHelper.replace(this, TASK, inner)) {
p.subscribe(inner);
}
}
}
项目:Reactive4JavaFlow
文件:FolyamZipLatestArray.java
void subscribe(Flow.Publisher<? extends T>[] sources, int n) {
for (int i = 0; i < n; i++) {
if (cancelled) {
return;
}
sources[i].subscribe(subscribers[i]);
}
}
项目:Java-9-Spring-Webflux
文件:DockerXDemoSubscriber.java
public void onSubscribe(Flow.Subscription subscription) {
//count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
(this.subscription = subscription).request(bufferSize);
System.out.println("开始onSubscribe订阅");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
项目:Reactive4JavaFlow
文件:FolyamConcatMapEager.java
public FolyamConcatMapEager(Folyam<T> source, CheckedFunction<? super T, ? extends Flow.Publisher<? extends R>> mapper, int maxConcurrency, int prefetch, boolean delayError) {
this.source = source;
this.mapper = mapper;
this.maxConcurrency = maxConcurrency;
this.prefetch = prefetch;
this.delayError = delayError;
}
项目:Reactive4JavaFlow
文件:FolyamDoOnSignal.java
protected DoOnSignalSubscriber(FolyamSubscriber<? super T> actual,
CheckedConsumer<? super Flow.Subscription> onSubscribe,
CheckedConsumer<? super T> onNext,
CheckedConsumer<? super T> onAfterNext,
CheckedConsumer<? super Throwable> onError,
CheckedRunnable onComplete,
CheckedConsumer<? super Long> onRequest,
CheckedRunnable onCancel) {
super(onSubscribe, onNext, onAfterNext, onError, onComplete, onRequest, onCancel);
this.actual = actual;
}
项目:Reactive4JavaFlow
文件:FolyamSynchronousCoarseProfiler.java
@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Flow.Subscription s) {
this.s = s;
if (s instanceof FusedSubscription) {
qs = (FusedSubscription<T>)s;
}
calls.onSubscribeCount++;
startTime = System.nanoTime();
actual.onSubscribe(this);
}
项目:reactive-jax-rs
文件:ServletInputStreamPublisherAdapter.java
/**
* Pushes subscriber to subscribers list.
* @param subscriber
*/
@Override public void subscribe(Flow.Subscriber<? super byte[]> subscriber) {
if (!started) {
startReading();
started = true;
}
NoBackpressureSubscription subscription = new NoBackpressureSubscription(this);
this.subscribers.put(subscriber, subscription);
subscriber.onSubscribe(subscription);
}
项目:Reactive4JavaFlow
文件:ParallelRunOn.java
@Override
public void onSubscribe(Flow.Subscription s) {
this.s = s;
actual.onSubscribe(this);
s.request(prefetch);
}
项目:Reactive4JavaFlow
文件:FolyamFlattenIterable.java
@Override
public final void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
if (subscription instanceof FusedSubscription) {
FusedSubscription<T> fs = (FusedSubscription<T>) subscription;
int m = fs.requestFusion(ANY);
if (m == SYNC) {
sourceFused = m;
queue = fs;
DONE.setRelease(this, true);
onStart();
return;
}
if (m == ASYNC) {
sourceFused = m;
queue = fs;
onStart();
fs.request(prefetch);
return;
}
}
int pf = prefetch;
if (pf == 1) {
queue = new SpscOneQueue<>();
} else {
queue = new SpscArrayQueue<>(pf);
}
onStart();
subscription.request(pf);
}
项目:Reactive4JavaFlow
文件:EsetlegDoOnSignal.java
protected DoOnSignalConditionalSubscriber(ConditionalSubscriber<? super T> actual,
CheckedConsumer<? super Flow.Subscription> onSubscribe,
CheckedConsumer<? super T> onNext,
CheckedConsumer<? super T> onAfterNext,
CheckedConsumer<? super Throwable> onError,
CheckedRunnable onComplete,
CheckedConsumer<? super Long> onRequest,
CheckedRunnable onCancel) {
super(onSubscribe, onNext, onAfterNext, onError, onComplete, onRequest, onCancel);
this.actual = actual;
}
项目:reactive-streams-tck_playground
文件:ReactiveStreamsFlowBridge.java
/**
* Converts a Flow Publisher into a Reactive Streams Publisher.
* @param <T> the element type
* @param flowPublisher the source Flow Publisher to convert
* @return the equivalent Reactive Streams Publisher
*/
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
Flow.Publisher<? extends T> flowPublisher) {
if (flowPublisher == null) {
throw new NullPointerException("flowPublisher");
}
if (flowPublisher instanceof org.reactivestreams.Publisher) {
return (org.reactivestreams.Publisher<T>)flowPublisher;
}
if (flowPublisher instanceof FlowPublisherFromReactive) {
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
}
return new ReactivePublisherFromFlow<T>(flowPublisher);
}
项目:Reactive4JavaFlow
文件:HalfSerializer.java
public static void onComplete(Flow.Subscriber<?> actual, Object target, VarHandle WIP, VarHandle ERRORS) {
if ((int) WIP.getAndAdd(target, 1) == 0) {
Throwable ex = ExceptionHelper.terminate(target, ERRORS);
if (ex == null) {
actual.onComplete();
} else {
actual.onError(ex);
}
}
}
项目:Reactive4JavaFlow
文件:ConcatPublisherTckTest.java
@Override
public Flow.Publisher<Long> createFlowPublisher(long elements) {
return
Folyam.concat(Folyam.fromArray(
Folyam.fromIterable(iterate(elements / 2)),
Folyam.fromIterable(iterate(elements - elements / 2))
)
)
;
}
项目:Reactive4JavaFlow
文件:ZipWithTckTest.java
@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
return
Folyam.range(0, (int)elements)
.zipWith(Folyam.range((int)elements, (int)elements), (a, b) -> a + b)
;
}
项目:Reactive4JavaFlow
文件:FolyamOnTerminateDetach.java
@Override
public final void request(long n) {
Flow.Subscription s = (Flow.Subscription)UPSTREAM.getAcquire(this);
if (s != null) {
s.request(n);
}
}
项目:Reactive4JavaFlow
文件:FolyamZipArrayTest.java
@Test
public void zipArray() {
Flow.Publisher<Integer>[] sources = new Flow.Publisher[16];
Arrays.fill(sources, Folyam.just(1));
Folyam.zipArray(Arrays::toString, sources)
.test()
.assertResult("[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]");
}
项目:Reactive4JavaFlow
文件:FolyamSubscribeOn.java
public final void onSubscribe(Flow.Subscription subscription) {
if (SubscriptionHelper.replace(this, UPSTREAM, subscription)) {
long n = getAndSet(0L);
if (n != 0L) {
requestUpstream(n, subscription);
}
}
}
项目:Reactive4JavaFlow
文件:FirstProcessor.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (UPSTREAM.compareAndSet(this, null, subscription)) {
subscription.request(Long.MAX_VALUE);
} else {
subscription.cancel();
}
}
项目:Reactive4JavaFlow
文件:FolyamProcessorRefCount.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (UPSTREAM.compareAndSet(this, null, subscription)) {
actual.onSubscribe(subscription);
} else {
subscription.cancel();
}
}
项目:Reactive4JavaFlow
文件:ZipIterableTckTest.java
@SuppressWarnings("unchecked")
@Override
public Flow.Publisher<Long> createFlowPublisher(long elements) {
return
Folyam.zip(Arrays.asList(
Folyam.fromIterable(iterate(elements)),
Folyam.fromIterable(iterate(elements))
),
a -> (Long)a[0] + (Long)a[1]
)
;
}
项目:Reactive4JavaFlow
文件:HalfSerializer.java
public static void onError(Flow.Subscriber<?> actual, Object target, VarHandle WIP, VarHandle ERRORS, Throwable t) {
if (ExceptionHelper.addThrowable(target, ERRORS, t)) {
if ((int) WIP.getAndAdd(target, 1) == 0) {
Throwable ex = ExceptionHelper.terminate(target, ERRORS);
actual.onError(ex);
return;
}
}
FolyamPlugins.onError(t);
}
项目:Reactive4JavaFlow
文件:SubscriptionArbiter.java
@Override
public final void request(long n) {
if (unbounded) {
return;
}
if (getAcquire() == 0 && compareAndSet(0, 1)) {
long r = requested;
long u = r + n;
if (u < 0L || u == Long.MAX_VALUE) {
u = Long.MAX_VALUE;
unbounded = true;
}
requested = u;
Flow.Subscription s = currentSubscription;
if (decrementAndGet() != 0) {
arbiterDrainLoop();
}
if (s != null) {
s.request(n);
}
} else {
SubscriptionHelper.addRequested(this, MISSED_REQUESTED, n);
if (getAndIncrement() != 0) {
return;
}
arbiterDrainLoop();
}
}
项目:Reactive4JavaFlow
文件:EsetlegDoOnSignal.java
public EsetlegDoOnSignal(Esetleg<T> source, CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) {
this.source = source;
this.onSubscribe = onSubscribe;
this.onNext = onNext;
this.onAfterNext = onAfterNext;
this.onError = onError;
this.onComplete = onComplete;
this.onRequest = onRequest;
this.onCancel = onCancel;
}
项目:Reactive4JavaFlow
文件:FolyamDoFinally.java
public final void onSubscribe(Flow.Subscription subscription) {
this.upstream = subscription;
if (subscription instanceof FusedSubscription) {
this.qs = (FusedSubscription<T>)subscription;
}
onStart();
}
项目:Reactive4JavaFlow
文件:FolyamTakeWhile.java
@Override
public final void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
if (subscription instanceof FusedSubscription) {
qs = (FusedSubscription<T>)subscription;
}
onStart();
}
项目:Reactive4JavaFlow
文件:EsetlegDoOnSignal.java
protected DoOnSignalSubscriber(FolyamSubscriber<? super T> actual,
CheckedConsumer<? super Flow.Subscription> onSubscribe,
CheckedConsumer<? super T> onNext,
CheckedConsumer<? super T> onAfterNext,
CheckedConsumer<? super Throwable> onError,
CheckedRunnable onComplete,
CheckedConsumer<? super Long> onRequest,
CheckedRunnable onCancel) {
super(onSubscribe, onNext, onAfterNext, onError, onComplete, onRequest, onCancel);
this.actual = actual;
}
项目:Reactive4JavaFlow
文件:ReduceWithTckTest.java
@Override
public Flow.Publisher<Integer> createFlowPublisher(final long elements) {
return
Folyam.range(1, 1000)
.reduce(() -> 0, (a, b) -> a + b)
.toFolyam()
;
}
项目:Reactive4JavaFlow
文件:FolyamOnErrorResumeNext.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
arbiterReplace(subscription);
if (!once) {
actual.onSubscribe(this);
}
}
项目:Reactive4JavaFlow
文件:FolyamDoOnSignal.java
public FolyamDoOnSignal(Folyam<T> source, CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) {
this.source = source;
this.onSubscribe = onSubscribe;
this.onNext = onNext;
this.onAfterNext = onAfterNext;
this.onError = onError;
this.onComplete = onComplete;
this.onRequest = onRequest;
this.onCancel = onCancel;
}
项目:Reactive4JavaFlow
文件:EsetlegDoOnSignal.java
protected AbstractDoOnSignal(CheckedConsumer<? super Flow.Subscription> onSubscribe,
CheckedConsumer<? super T> onNext,
CheckedConsumer<? super T> onAfterNext,
CheckedConsumer<? super Throwable> onError,
CheckedRunnable onComplete,
CheckedConsumer<? super Long> onRequest,
CheckedRunnable onCancel) {
this.onSubscribe = onSubscribe != null ? onSubscribe : e -> { };
this.onNext = onNext != null ? onNext : e -> { };
this.onAfterNext = onAfterNext != null ? onAfterNext : e -> { };
this.onError = onError != null ? onError : e -> { };
this.onComplete = onComplete != null ? onComplete : () -> { };
this.onRequest = onRequest != null ? onRequest : e -> { };
this.onCancel = onCancel != null ? onCancel : () -> { };
}
项目:Reactive4JavaFlow
文件:FolyamSwitchIfEmpty.java
@Override
public void onComplete() {
if (once) {
actual.onComplete();
} else {
once = true;
Flow.Publisher<? extends T> p = this.other;
other = null;
p.subscribe(this);
}
}
项目:Reactive4JavaFlow
文件:FolyamSwitchIfEmpty.java
@Override
public void onComplete() {
if (once) {
actual.onComplete();
} else {
once = true;
Flow.Publisher<? extends T> p = this.other;
other = null;
p.subscribe(this);
}
}
项目:openjdk-jdk10
文件:Http1Request.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException("already subscribed");
}
this.subscription = subscription;
subscription.request(1);
}