Java 类java.util.concurrent.Flow 实例源码

项目:Reactive4JavaFlow    文件:SubscriptionArbiterTest.java   
@Test
public void cancelRequests() {
    SubscriptionArbiter sa = new SubscriptionArbiter();

    sa.arbiterReplace(new Flow.Subscription() {

        @Override
        public void request(long n) {
        }

        @Override
        public void cancel() {
            sa.request(1);
        }
    });

    sa.cancel();
}
项目:Reactive4JavaFlow    文件:FolyamSwitchFlatMapTest.java   
@Test
public void mapperThrows() {
    DirectProcessor<Integer> bp = new DirectProcessor<>();

    TestConsumer<Object> tc = bp
            .switchFlatMap((CheckedFunction<Object, Flow.Publisher<Object>>) v -> {
                throw new IOException();
            }, 2)
            .test();

    bp.onNext(1);

    tc.assertFailure(IOException.class);

    assertFalse(bp.hasSubscribers());
}
项目:Reactive4JavaFlow    文件:ParallelPeek.java   
@Override
public void onSubscribe(Flow.Subscription s) {
    this.s = s;

    try {
        parent.onSubscribe.accept(s);
    } catch (Throwable ex) {
        FolyamPlugins.handleFatal(ex);
        s.cancel();
        actual.onSubscribe(EmptySubscription.INSTANCE);
        onError(ex);
        return;
    }

    actual.onSubscribe(this);
}
项目:Reactive4JavaFlow    文件:FolyamOrderedMergeIterable.java   
@SuppressWarnings("unchecked")
@Override
protected void subscribeActual(FolyamSubscriber<? super T> s) {
    Flow.Publisher<? extends T>[] array = new Flow.Publisher[8];
    int n = 0;
    try {
        for (Flow.Publisher<? extends T> p : sourcesIterable) {
            if (n == array.length) {
                array = Arrays.copyOf(array, n << 1);
            }
            array[n++] = Objects.requireNonNull(p, "a source is null");
        }
    } catch (Throwable ex) {
        FolyamPlugins.handleFatal(ex);
        EmptySubscription.error(s, ex);
        return;
    }

    FolyamOrderedMergeArray.subscribe(s, array, n, comparator, prefetch, delayErrors);
}
项目:Reactive4JavaFlow    文件:AsyncProcessorAsPublisherTckTest.java   
@Override
public Flow.Publisher<Integer> createFlowPublisher(final long elements) {
    final LastProcessor<Integer> pp = new LastProcessor<>();

    SchedulerServices.io().schedule(() -> {
        long start = System.currentTimeMillis();
        while (!pp.hasSubscribers()) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException ex) {
                return;
            }

            if (System.currentTimeMillis() - start > 200) {
                return;
            }
        }

        for (int i = 0; i < elements; i++) {
            pp.onNext(i);
        }
        pp.onComplete();
    });
    return pp;
}
项目:Reactive4JavaFlow    文件:FolyamOnErrorResumeNext.java   
@Override
public void onError(Throwable throwable) {
    if (once) {
        actual.onError(throwable);
        return;
    }
    once = true;
    Flow.Publisher<? extends T> p;

    try {
        p = Objects.requireNonNull(handler.apply(throwable), "The handler returned a null Flow.Publisher");
    } catch (Throwable ex) {
        actual.onError(new CompositeThrowable(throwable, ex));
        return;
    }

    long c = produced;
    if (c != 0) {
        arbiterProduced(c);
    }
    p.subscribe(this);
}
项目:Reactive4JavaFlow    文件:FolyamTimeoutSelectorFallback.java   
public final void onSubscribe(Flow.Subscription subscription) {
    upstream = subscription;
    arbiterReplace(subscription);
    if (firstTimeout != null) {
        AtomicDisposable d = new AtomicDisposable();
        task = d;

        onStart();
        if (d.get() == null) {
            TimeoutInnerSubscriber inner = new TimeoutInnerSubscriber(this, 0);
            if (d.compareAndSet(null, inner)) {
                firstTimeout.subscribe(inner);
            }
        }
    } else {
        onStart();
    }
}
项目:Reactive4JavaFlow    文件:FolyamTimeoutSelectorFallback.java   
public final void onNext(T item) {
    AutoDisposable d = task;
    if (d != null) {
        d.close();
    }

    long idx = (long)INDEX.getAcquire(this);
    if (idx != Long.MIN_VALUE && INDEX.compareAndSet(this, idx, idx + 1)) {
        next(item);

        Flow.Publisher<?> p;

        try {
            p = Objects.requireNonNull(itemTimeoutSelector.apply(item), "The itemTimeoutSelector returned a null Flow.Publisher");
        } catch (Throwable ex) {
            onError(ex);
            return;
        }
        // replace
        TimeoutInnerSubscriber inner = new TimeoutInnerSubscriber(this, idx + 1);
        if (DisposableHelper.replace(this, TASK, inner)) {
            p.subscribe(inner);
        }
    }
}
项目:Reactive4JavaFlow    文件:FolyamZipLatestArray.java   
void subscribe(Flow.Publisher<? extends T>[] sources, int n) {
    for (int i = 0; i < n; i++) {
        if (cancelled) {
            return;
        }
        sources[i].subscribe(subscribers[i]);
    }
}
项目:Java-9-Spring-Webflux    文件:DockerXDemoSubscriber.java   
public void onSubscribe(Flow.Subscription subscription) {
    //count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
    (this.subscription = subscription).request(bufferSize);
    System.out.println("开始onSubscribe订阅");
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
项目:Reactive4JavaFlow    文件:FolyamConcatMapEager.java   
public FolyamConcatMapEager(Folyam<T> source, CheckedFunction<? super T, ? extends Flow.Publisher<? extends R>> mapper, int maxConcurrency, int prefetch, boolean delayError) {
    this.source = source;
    this.mapper = mapper;
    this.maxConcurrency = maxConcurrency;
    this.prefetch = prefetch;
    this.delayError = delayError;
}
项目:Reactive4JavaFlow    文件:FolyamDoOnSignal.java   
protected DoOnSignalSubscriber(FolyamSubscriber<? super T> actual,
                               CheckedConsumer<? super Flow.Subscription> onSubscribe,
                               CheckedConsumer<? super T> onNext,
                               CheckedConsumer<? super T> onAfterNext,
                               CheckedConsumer<? super Throwable> onError,
                               CheckedRunnable onComplete,
                               CheckedConsumer<? super Long> onRequest,
                               CheckedRunnable onCancel) {
    super(onSubscribe, onNext, onAfterNext, onError, onComplete, onRequest, onCancel);
    this.actual = actual;
}
项目:Reactive4JavaFlow    文件:FolyamSynchronousCoarseProfiler.java   
@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Flow.Subscription s) {
    this.s = s;
    if (s instanceof FusedSubscription) {
        qs = (FusedSubscription<T>)s;
    }

    calls.onSubscribeCount++;
    startTime = System.nanoTime();

    actual.onSubscribe(this);
}
项目:reactive-jax-rs    文件:ServletInputStreamPublisherAdapter.java   
/**
 * Pushes subscriber to subscribers list.
 * @param subscriber
 */
@Override public void subscribe(Flow.Subscriber<? super byte[]> subscriber) {
  if (!started) {
    startReading();
    started = true;
  }

  NoBackpressureSubscription subscription = new NoBackpressureSubscription(this);

  this.subscribers.put(subscriber, subscription);

  subscriber.onSubscribe(subscription);
}
项目:Reactive4JavaFlow    文件:ParallelRunOn.java   
@Override
public void onSubscribe(Flow.Subscription s) {
    this.s = s;

    actual.onSubscribe(this);

    s.request(prefetch);
}
项目:Reactive4JavaFlow    文件:FolyamFlattenIterable.java   
@Override
public final void onSubscribe(Flow.Subscription subscription) {
    upstream = subscription;
    if (subscription instanceof FusedSubscription) {
        FusedSubscription<T> fs = (FusedSubscription<T>) subscription;
        int m = fs.requestFusion(ANY);
        if (m == SYNC) {
            sourceFused = m;
            queue = fs;
            DONE.setRelease(this, true);
            onStart();
            return;
        }
        if (m == ASYNC) {
            sourceFused = m;
            queue = fs;
            onStart();
            fs.request(prefetch);
            return;
        }
    }

    int pf = prefetch;
    if (pf == 1) {
        queue = new SpscOneQueue<>();
    } else {
        queue = new SpscArrayQueue<>(pf);
    }

    onStart();

    subscription.request(pf);
}
项目:Reactive4JavaFlow    文件:EsetlegDoOnSignal.java   
protected DoOnSignalConditionalSubscriber(ConditionalSubscriber<? super T> actual,
                               CheckedConsumer<? super Flow.Subscription> onSubscribe,
                               CheckedConsumer<? super T> onNext,
                               CheckedConsumer<? super T> onAfterNext,
                               CheckedConsumer<? super Throwable> onError,
                               CheckedRunnable onComplete,
                               CheckedConsumer<? super Long> onRequest,
                               CheckedRunnable onCancel) {
    super(onSubscribe, onNext, onAfterNext, onError, onComplete, onRequest, onCancel);
    this.actual = actual;
}
项目:reactive-streams-tck_playground    文件:ReactiveStreamsFlowBridge.java   
/**
 * Converts a Flow Publisher into a Reactive Streams Publisher.
 * @param <T> the element type
 * @param flowPublisher the source Flow Publisher to convert
 * @return the equivalent Reactive Streams Publisher
 */
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
        Flow.Publisher<? extends T> flowPublisher) {
    if (flowPublisher == null) {
        throw new NullPointerException("flowPublisher");
    }
    if (flowPublisher instanceof org.reactivestreams.Publisher) {
        return (org.reactivestreams.Publisher<T>)flowPublisher;
    }
    if (flowPublisher instanceof FlowPublisherFromReactive) {
        return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
    }
    return new ReactivePublisherFromFlow<T>(flowPublisher);
}
项目:Reactive4JavaFlow    文件:HalfSerializer.java   
public static void onComplete(Flow.Subscriber<?> actual, Object target, VarHandle WIP, VarHandle ERRORS) {
    if ((int) WIP.getAndAdd(target, 1) == 0) {
        Throwable ex = ExceptionHelper.terminate(target, ERRORS);
        if (ex == null) {
            actual.onComplete();
        } else {
            actual.onError(ex);
        }
    }
}
项目:Reactive4JavaFlow    文件:ConcatPublisherTckTest.java   
@Override
public Flow.Publisher<Long> createFlowPublisher(long elements) {
    return
        Folyam.concat(Folyam.fromArray(
                Folyam.fromIterable(iterate(elements / 2)),
                Folyam.fromIterable(iterate(elements - elements / 2))
            )
        )
    ;
}
项目:Reactive4JavaFlow    文件:ZipWithTckTest.java   
@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
    return
            Folyam.range(0, (int)elements)
            .zipWith(Folyam.range((int)elements, (int)elements), (a, b) -> a + b)
    ;
}
项目:Reactive4JavaFlow    文件:FolyamOnTerminateDetach.java   
@Override
public final void request(long n) {
    Flow.Subscription s = (Flow.Subscription)UPSTREAM.getAcquire(this);
    if (s != null) {
        s.request(n);
    }
}
项目:Reactive4JavaFlow    文件:FolyamZipArrayTest.java   
@Test
public void zipArray() {
    Flow.Publisher<Integer>[] sources = new Flow.Publisher[16];
    Arrays.fill(sources, Folyam.just(1));

    Folyam.zipArray(Arrays::toString, sources)
            .test()
            .assertResult("[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]");
}
项目:Reactive4JavaFlow    文件:FolyamSubscribeOn.java   
public final void onSubscribe(Flow.Subscription subscription) {
    if (SubscriptionHelper.replace(this, UPSTREAM, subscription)) {
        long n = getAndSet(0L);
        if (n != 0L) {
            requestUpstream(n, subscription);
        }
    }
}
项目:Reactive4JavaFlow    文件:FirstProcessor.java   
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (UPSTREAM.compareAndSet(this, null, subscription)) {
        subscription.request(Long.MAX_VALUE);
    } else {
        subscription.cancel();
    }
}
项目:Reactive4JavaFlow    文件:FolyamProcessorRefCount.java   
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (UPSTREAM.compareAndSet(this, null, subscription)) {
        actual.onSubscribe(subscription);
    } else {
        subscription.cancel();
    }
}
项目:Reactive4JavaFlow    文件:ZipIterableTckTest.java   
@SuppressWarnings("unchecked")
@Override
public Flow.Publisher<Long> createFlowPublisher(long elements) {
    return
        Folyam.zip(Arrays.asList(
                Folyam.fromIterable(iterate(elements)),
                Folyam.fromIterable(iterate(elements))
            ),
            a -> (Long)a[0] + (Long)a[1]
        )
    ;
}
项目:Reactive4JavaFlow    文件:HalfSerializer.java   
public static void onError(Flow.Subscriber<?> actual, Object target, VarHandle WIP, VarHandle ERRORS, Throwable t) {
    if (ExceptionHelper.addThrowable(target, ERRORS, t)) {
        if ((int) WIP.getAndAdd(target, 1) == 0) {
            Throwable ex = ExceptionHelper.terminate(target, ERRORS);
            actual.onError(ex);
            return;
        }
    }
    FolyamPlugins.onError(t);
}
项目:Reactive4JavaFlow    文件:SubscriptionArbiter.java   
@Override
public final void request(long n) {
    if (unbounded) {
        return;
    }
    if (getAcquire() == 0 && compareAndSet(0, 1)) {
        long r = requested;
        long u = r + n;
        if (u < 0L || u == Long.MAX_VALUE) {
            u = Long.MAX_VALUE;
            unbounded = true;
        }
        requested = u;
        Flow.Subscription s = currentSubscription;
        if (decrementAndGet() != 0) {
            arbiterDrainLoop();
        }
        if (s != null) {
            s.request(n);
        }
    } else {
        SubscriptionHelper.addRequested(this, MISSED_REQUESTED, n);
        if (getAndIncrement() != 0) {
            return;
        }
        arbiterDrainLoop();
    }
}
项目:Reactive4JavaFlow    文件:EsetlegDoOnSignal.java   
public EsetlegDoOnSignal(Esetleg<T> source, CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) {
    this.source = source;
    this.onSubscribe = onSubscribe;
    this.onNext = onNext;
    this.onAfterNext = onAfterNext;
    this.onError = onError;
    this.onComplete = onComplete;
    this.onRequest = onRequest;
    this.onCancel = onCancel;
}
项目:Reactive4JavaFlow    文件:FolyamDoFinally.java   
public final void onSubscribe(Flow.Subscription subscription) {
    this.upstream = subscription;
    if (subscription instanceof FusedSubscription) {
        this.qs = (FusedSubscription<T>)subscription;
    }
    onStart();
}
项目:Reactive4JavaFlow    文件:FolyamTakeWhile.java   
@Override
public final void onSubscribe(Flow.Subscription subscription) {
    upstream = subscription;
    if (subscription instanceof FusedSubscription) {
        qs = (FusedSubscription<T>)subscription;
    }
    onStart();
}
项目:Reactive4JavaFlow    文件:EsetlegDoOnSignal.java   
protected DoOnSignalSubscriber(FolyamSubscriber<? super T> actual,
                               CheckedConsumer<? super Flow.Subscription> onSubscribe,
                               CheckedConsumer<? super T> onNext,
                               CheckedConsumer<? super T> onAfterNext,
                               CheckedConsumer<? super Throwable> onError,
                               CheckedRunnable onComplete,
                               CheckedConsumer<? super Long> onRequest,
                               CheckedRunnable onCancel) {
    super(onSubscribe, onNext, onAfterNext, onError, onComplete, onRequest, onCancel);
    this.actual = actual;
}
项目:Reactive4JavaFlow    文件:ReduceWithTckTest.java   
@Override
public Flow.Publisher<Integer> createFlowPublisher(final long elements) {
    return
            Folyam.range(1, 1000)
                    .reduce(() -> 0, (a, b) -> a + b)
                    .toFolyam()
        ;
}
项目:Reactive4JavaFlow    文件:FolyamOnErrorResumeNext.java   
@Override
public void onSubscribe(Flow.Subscription subscription) {
    arbiterReplace(subscription);
    if (!once) {
        actual.onSubscribe(this);
    }
}
项目:Reactive4JavaFlow    文件:FolyamDoOnSignal.java   
public FolyamDoOnSignal(Folyam<T> source, CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) {
    this.source = source;
    this.onSubscribe = onSubscribe;
    this.onNext = onNext;
    this.onAfterNext = onAfterNext;
    this.onError = onError;
    this.onComplete = onComplete;
    this.onRequest = onRequest;
    this.onCancel = onCancel;
}
项目:Reactive4JavaFlow    文件:EsetlegDoOnSignal.java   
protected AbstractDoOnSignal(CheckedConsumer<? super Flow.Subscription> onSubscribe,
                             CheckedConsumer<? super T> onNext,
                             CheckedConsumer<? super T> onAfterNext,
                             CheckedConsumer<? super Throwable> onError,
                             CheckedRunnable onComplete,
                             CheckedConsumer<? super Long> onRequest,
                             CheckedRunnable onCancel) {
    this.onSubscribe = onSubscribe != null ? onSubscribe : e -> { };
    this.onNext = onNext != null ? onNext : e -> { };
    this.onAfterNext = onAfterNext != null ? onAfterNext : e -> { };
    this.onError = onError != null ? onError : e -> { };
    this.onComplete = onComplete != null ? onComplete : () -> { };
    this.onRequest = onRequest != null ? onRequest : e -> { };
    this.onCancel = onCancel != null ? onCancel : () -> { };
}
项目:Reactive4JavaFlow    文件:FolyamSwitchIfEmpty.java   
@Override
public void onComplete() {
    if (once) {
        actual.onComplete();
    } else {
        once = true;
        Flow.Publisher<? extends T> p = this.other;
        other = null;
        p.subscribe(this);
    }
}
项目:Reactive4JavaFlow    文件:FolyamSwitchIfEmpty.java   
@Override
public void onComplete() {
    if (once) {
        actual.onComplete();
    } else {
        once = true;
        Flow.Publisher<? extends T> p = this.other;
        other = null;
        p.subscribe(this);
    }
}
项目:openjdk-jdk10    文件:Http1Request.java   
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription != null) {
        throw new IllegalStateException("already subscribed");
    }
    this.subscription = subscription;
    subscription.request(1);
}