@Test public void cancelRequests() { SubscriptionArbiter sa = new SubscriptionArbiter(); sa.arbiterReplace(new Flow.Subscription() { @Override public void request(long n) { } @Override public void cancel() { sa.request(1); } }); sa.cancel(); }
@Test public void mapperThrows() { DirectProcessor<Integer> bp = new DirectProcessor<>(); TestConsumer<Object> tc = bp .switchFlatMap((CheckedFunction<Object, Flow.Publisher<Object>>) v -> { throw new IOException(); }, 2) .test(); bp.onNext(1); tc.assertFailure(IOException.class); assertFalse(bp.hasSubscribers()); }
@Override public void onSubscribe(Flow.Subscription s) { this.s = s; try { parent.onSubscribe.accept(s); } catch (Throwable ex) { FolyamPlugins.handleFatal(ex); s.cancel(); actual.onSubscribe(EmptySubscription.INSTANCE); onError(ex); return; } actual.onSubscribe(this); }
@SuppressWarnings("unchecked") @Override protected void subscribeActual(FolyamSubscriber<? super T> s) { Flow.Publisher<? extends T>[] array = new Flow.Publisher[8]; int n = 0; try { for (Flow.Publisher<? extends T> p : sourcesIterable) { if (n == array.length) { array = Arrays.copyOf(array, n << 1); } array[n++] = Objects.requireNonNull(p, "a source is null"); } } catch (Throwable ex) { FolyamPlugins.handleFatal(ex); EmptySubscription.error(s, ex); return; } FolyamOrderedMergeArray.subscribe(s, array, n, comparator, prefetch, delayErrors); }
@Override public Flow.Publisher<Integer> createFlowPublisher(final long elements) { final LastProcessor<Integer> pp = new LastProcessor<>(); SchedulerServices.io().schedule(() -> { long start = System.currentTimeMillis(); while (!pp.hasSubscribers()) { try { Thread.sleep(1); } catch (InterruptedException ex) { return; } if (System.currentTimeMillis() - start > 200) { return; } } for (int i = 0; i < elements; i++) { pp.onNext(i); } pp.onComplete(); }); return pp; }
@Override public void onError(Throwable throwable) { if (once) { actual.onError(throwable); return; } once = true; Flow.Publisher<? extends T> p; try { p = Objects.requireNonNull(handler.apply(throwable), "The handler returned a null Flow.Publisher"); } catch (Throwable ex) { actual.onError(new CompositeThrowable(throwable, ex)); return; } long c = produced; if (c != 0) { arbiterProduced(c); } p.subscribe(this); }
public final void onSubscribe(Flow.Subscription subscription) { upstream = subscription; arbiterReplace(subscription); if (firstTimeout != null) { AtomicDisposable d = new AtomicDisposable(); task = d; onStart(); if (d.get() == null) { TimeoutInnerSubscriber inner = new TimeoutInnerSubscriber(this, 0); if (d.compareAndSet(null, inner)) { firstTimeout.subscribe(inner); } } } else { onStart(); } }
public final void onNext(T item) { AutoDisposable d = task; if (d != null) { d.close(); } long idx = (long)INDEX.getAcquire(this); if (idx != Long.MIN_VALUE && INDEX.compareAndSet(this, idx, idx + 1)) { next(item); Flow.Publisher<?> p; try { p = Objects.requireNonNull(itemTimeoutSelector.apply(item), "The itemTimeoutSelector returned a null Flow.Publisher"); } catch (Throwable ex) { onError(ex); return; } // replace TimeoutInnerSubscriber inner = new TimeoutInnerSubscriber(this, idx + 1); if (DisposableHelper.replace(this, TASK, inner)) { p.subscribe(inner); } } }
void subscribe(Flow.Publisher<? extends T>[] sources, int n) { for (int i = 0; i < n; i++) { if (cancelled) { return; } sources[i].subscribe(subscribers[i]); } }
public void onSubscribe(Flow.Subscription subscription) { //count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求 (this.subscription = subscription).request(bufferSize); System.out.println("开始onSubscribe订阅"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } }
public FolyamConcatMapEager(Folyam<T> source, CheckedFunction<? super T, ? extends Flow.Publisher<? extends R>> mapper, int maxConcurrency, int prefetch, boolean delayError) { this.source = source; this.mapper = mapper; this.maxConcurrency = maxConcurrency; this.prefetch = prefetch; this.delayError = delayError; }
protected DoOnSignalSubscriber(FolyamSubscriber<? super T> actual, CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) { super(onSubscribe, onNext, onAfterNext, onError, onComplete, onRequest, onCancel); this.actual = actual; }
@SuppressWarnings("unchecked") @Override public void onSubscribe(Flow.Subscription s) { this.s = s; if (s instanceof FusedSubscription) { qs = (FusedSubscription<T>)s; } calls.onSubscribeCount++; startTime = System.nanoTime(); actual.onSubscribe(this); }
/** * Pushes subscriber to subscribers list. * @param subscriber */ @Override public void subscribe(Flow.Subscriber<? super byte[]> subscriber) { if (!started) { startReading(); started = true; } NoBackpressureSubscription subscription = new NoBackpressureSubscription(this); this.subscribers.put(subscriber, subscription); subscriber.onSubscribe(subscription); }
@Override public void onSubscribe(Flow.Subscription s) { this.s = s; actual.onSubscribe(this); s.request(prefetch); }
@Override public final void onSubscribe(Flow.Subscription subscription) { upstream = subscription; if (subscription instanceof FusedSubscription) { FusedSubscription<T> fs = (FusedSubscription<T>) subscription; int m = fs.requestFusion(ANY); if (m == SYNC) { sourceFused = m; queue = fs; DONE.setRelease(this, true); onStart(); return; } if (m == ASYNC) { sourceFused = m; queue = fs; onStart(); fs.request(prefetch); return; } } int pf = prefetch; if (pf == 1) { queue = new SpscOneQueue<>(); } else { queue = new SpscArrayQueue<>(pf); } onStart(); subscription.request(pf); }
protected DoOnSignalConditionalSubscriber(ConditionalSubscriber<? super T> actual, CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) { super(onSubscribe, onNext, onAfterNext, onError, onComplete, onRequest, onCancel); this.actual = actual; }
/** * Converts a Flow Publisher into a Reactive Streams Publisher. * @param <T> the element type * @param flowPublisher the source Flow Publisher to convert * @return the equivalent Reactive Streams Publisher */ @SuppressWarnings("unchecked") public static <T> org.reactivestreams.Publisher<T> toReactiveStreams( Flow.Publisher<? extends T> flowPublisher) { if (flowPublisher == null) { throw new NullPointerException("flowPublisher"); } if (flowPublisher instanceof org.reactivestreams.Publisher) { return (org.reactivestreams.Publisher<T>)flowPublisher; } if (flowPublisher instanceof FlowPublisherFromReactive) { return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams); } return new ReactivePublisherFromFlow<T>(flowPublisher); }
public static void onComplete(Flow.Subscriber<?> actual, Object target, VarHandle WIP, VarHandle ERRORS) { if ((int) WIP.getAndAdd(target, 1) == 0) { Throwable ex = ExceptionHelper.terminate(target, ERRORS); if (ex == null) { actual.onComplete(); } else { actual.onError(ex); } } }
@Override public Flow.Publisher<Long> createFlowPublisher(long elements) { return Folyam.concat(Folyam.fromArray( Folyam.fromIterable(iterate(elements / 2)), Folyam.fromIterable(iterate(elements - elements / 2)) ) ) ; }
@Override public Flow.Publisher<Integer> createFlowPublisher(long elements) { return Folyam.range(0, (int)elements) .zipWith(Folyam.range((int)elements, (int)elements), (a, b) -> a + b) ; }
@Override public final void request(long n) { Flow.Subscription s = (Flow.Subscription)UPSTREAM.getAcquire(this); if (s != null) { s.request(n); } }
@Test public void zipArray() { Flow.Publisher<Integer>[] sources = new Flow.Publisher[16]; Arrays.fill(sources, Folyam.just(1)); Folyam.zipArray(Arrays::toString, sources) .test() .assertResult("[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]"); }
public final void onSubscribe(Flow.Subscription subscription) { if (SubscriptionHelper.replace(this, UPSTREAM, subscription)) { long n = getAndSet(0L); if (n != 0L) { requestUpstream(n, subscription); } } }
@Override public void onSubscribe(Flow.Subscription subscription) { if (UPSTREAM.compareAndSet(this, null, subscription)) { subscription.request(Long.MAX_VALUE); } else { subscription.cancel(); } }
@Override public void onSubscribe(Flow.Subscription subscription) { if (UPSTREAM.compareAndSet(this, null, subscription)) { actual.onSubscribe(subscription); } else { subscription.cancel(); } }
@SuppressWarnings("unchecked") @Override public Flow.Publisher<Long> createFlowPublisher(long elements) { return Folyam.zip(Arrays.asList( Folyam.fromIterable(iterate(elements)), Folyam.fromIterable(iterate(elements)) ), a -> (Long)a[0] + (Long)a[1] ) ; }
public static void onError(Flow.Subscriber<?> actual, Object target, VarHandle WIP, VarHandle ERRORS, Throwable t) { if (ExceptionHelper.addThrowable(target, ERRORS, t)) { if ((int) WIP.getAndAdd(target, 1) == 0) { Throwable ex = ExceptionHelper.terminate(target, ERRORS); actual.onError(ex); return; } } FolyamPlugins.onError(t); }
@Override public final void request(long n) { if (unbounded) { return; } if (getAcquire() == 0 && compareAndSet(0, 1)) { long r = requested; long u = r + n; if (u < 0L || u == Long.MAX_VALUE) { u = Long.MAX_VALUE; unbounded = true; } requested = u; Flow.Subscription s = currentSubscription; if (decrementAndGet() != 0) { arbiterDrainLoop(); } if (s != null) { s.request(n); } } else { SubscriptionHelper.addRequested(this, MISSED_REQUESTED, n); if (getAndIncrement() != 0) { return; } arbiterDrainLoop(); } }
public EsetlegDoOnSignal(Esetleg<T> source, CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) { this.source = source; this.onSubscribe = onSubscribe; this.onNext = onNext; this.onAfterNext = onAfterNext; this.onError = onError; this.onComplete = onComplete; this.onRequest = onRequest; this.onCancel = onCancel; }
public final void onSubscribe(Flow.Subscription subscription) { this.upstream = subscription; if (subscription instanceof FusedSubscription) { this.qs = (FusedSubscription<T>)subscription; } onStart(); }
@Override public final void onSubscribe(Flow.Subscription subscription) { upstream = subscription; if (subscription instanceof FusedSubscription) { qs = (FusedSubscription<T>)subscription; } onStart(); }
@Override public Flow.Publisher<Integer> createFlowPublisher(final long elements) { return Folyam.range(1, 1000) .reduce(() -> 0, (a, b) -> a + b) .toFolyam() ; }
@Override public void onSubscribe(Flow.Subscription subscription) { arbiterReplace(subscription); if (!once) { actual.onSubscribe(this); } }
public FolyamDoOnSignal(Folyam<T> source, CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) { this.source = source; this.onSubscribe = onSubscribe; this.onNext = onNext; this.onAfterNext = onAfterNext; this.onError = onError; this.onComplete = onComplete; this.onRequest = onRequest; this.onCancel = onCancel; }
protected AbstractDoOnSignal(CheckedConsumer<? super Flow.Subscription> onSubscribe, CheckedConsumer<? super T> onNext, CheckedConsumer<? super T> onAfterNext, CheckedConsumer<? super Throwable> onError, CheckedRunnable onComplete, CheckedConsumer<? super Long> onRequest, CheckedRunnable onCancel) { this.onSubscribe = onSubscribe != null ? onSubscribe : e -> { }; this.onNext = onNext != null ? onNext : e -> { }; this.onAfterNext = onAfterNext != null ? onAfterNext : e -> { }; this.onError = onError != null ? onError : e -> { }; this.onComplete = onComplete != null ? onComplete : () -> { }; this.onRequest = onRequest != null ? onRequest : e -> { }; this.onCancel = onCancel != null ? onCancel : () -> { }; }
@Override public void onComplete() { if (once) { actual.onComplete(); } else { once = true; Flow.Publisher<? extends T> p = this.other; other = null; p.subscribe(this); } }
@Override public void onSubscribe(Flow.Subscription subscription) { if (this.subscription != null) { throw new IllegalStateException("already subscribed"); } this.subscription = subscription; subscription.request(1); }