public static void main(String[] args) { int numberOfRails = 4; // can query #processors with parallelism() ParallelFlowable .from(Flowable.range(1, 10), numberOfRails) .runOn(Schedulers.computation()) .map(i -> i * i) .filter(i -> i % 3 == 0) .sequential() .subscribe(System.out::println); }
ParallelOrderedMerge(ParallelFlowable<T> source, Comparator<? super T> comparator, boolean delayErrors, int prefetch) { this.source = source; this.comparator = comparator; this.delayErrors = delayErrors; this.prefetch = prefetch; }
@Override public ParallelFlowableSubscribeProxy<T> apply(final ParallelFlowable<T> upstream) { return new ParallelFlowableSubscribeProxy<T>() { @Override public void subscribe(Subscriber<? super T>[] subscribers) { new AutoDisposeParallelFlowable<>(upstream, scope()).subscribe(subscribers); } }; }
@Setup public void setup() { flowable = ParallelFlowable.from(Flowable.range(0, count)).runOn(Schedulers.computation()) .filter(v -> { Blackhole.consumeCPU(cost); return false; }) .sequential(); flowableFJ = ParallelFlowable.from(Flowable.range(0, count)) .runOn(Schedulers.from(ForkJoinPool.commonPool())) .filter(v -> { Blackhole.consumeCPU(cost); return false; }) .sequential(); }
@BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") @CheckReturnValue @Beta public ParallelFlowable<T> parallel() { return boxed.parallel(); }
@BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") @CheckReturnValue @Beta public ParallelFlowable<T> parallel(int parallelism) { return boxed.parallel(parallelism); }
@BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") @CheckReturnValue @Beta public ParallelFlowable<T> parallel(int parallelism, int prefetch) { return boxed.parallel(parallelism, prefetch); }
ParallelFlowableOnAssembly(ParallelFlowable<T> source) { this.source = source; this.assembled = new RxJavaAssemblyException(); }
static ParallelFlowable<Integer> createParallelFlowable() { return Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new IOException())).parallel(); }
ParallelFlowableValidator(ParallelFlowable<T> source, PlainConsumer<ProtocolNonConformanceException> onViolation) { this.source = source; this.onViolation = onViolation; }
ParallelSumInteger(ParallelFlowable<? extends Number> source) { this.source = source; }
@Override public ParallelFlowable<Integer> apply(ParallelFlowable<T> t) { return new ParallelSumInteger<T>(t); }
ParallelSumLong(ParallelFlowable<? extends Number> source) { this.source = source; }
@Override public ParallelFlowable<Long> apply(ParallelFlowable<T> t) { return new ParallelSumLong<T>(t); }
ParallelSumDouble(ParallelFlowable<? extends Number> source) { this.source = source; }
@Override public ParallelFlowable<Double> apply(ParallelFlowable<T> t) { return new ParallelSumDouble<T>(t); }
public void subscribe(ParallelFlowable<T> source) { source.subscribe(subscribers); }
@SuppressWarnings("unchecked") @Test public void parallelFlowable() { ParallelFlowable<Integer> source = new ParallelFlowable<Integer>() { @Override public void subscribe(Subscriber<? super Integer>[] s) { validate(s); s[0].onComplete(); s[0].onError(null); s[0].onError(new IOException()); s[0].onNext(null); s[0].onNext(1); s[0].onSubscribe(null); s[0].onSubscribe(new BooleanSubscription()); s[0].onSubscribe(new BooleanSubscription()); s[0].onComplete(); s[0].onNext(2); } @Override public int parallelism() { return 1; } }; RxJavaProtocolValidator.setOnViolationHandler(this); Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler()); SavedHooks h = RxJavaProtocolValidator.enableAndChain(); Assert.assertTrue(RxJavaProtocolValidator.isEnabled()); try { Flowable.just(1).publish().autoConnect().test().assertResult(1); Flowable.empty().publish().autoConnect().test().assertResult(); Flowable.error(new IOException()).test().assertFailure(IOException.class); ParallelFlowable<Integer> c = RxJavaPlugins.onAssembly(source); c.subscribe(new Subscriber[] { new TestSubscriber<Integer>(0) }); Assert.assertEquals(15, errors.size()); TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 1, NullOnErrorParameterException.class); TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 3, MultipleTerminationsException.class); TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class); Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException); TestHelper.assertError(errors, 5, MultipleTerminationsException.class); TestHelper.assertError(errors, 6, NullOnNextParameterException.class); TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class); TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class); TestHelper.assertError(errors, 13, MultipleTerminationsException.class); TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class); } finally { h.restore(); RxJavaProtocolValidator.setOnViolationHandler(null); } }
AutoDisposeParallelFlowable(ParallelFlowable<T> source, Maybe<?> scope) { this.source = source; this.scope = scope; }
/** * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from * them (determined by the Comparator), allows delaying any error they may signal and sets the prefetch * amount when requesting from these Publishers. * @param <T> the value type of all sources * @param source the source ParallelFlowable * @param comparator the comparator to use for comparing items; * it is called with the last known smallest in its first argument * @param delayErrors if true, source errors are delayed until all sources terminate in some way * @param prefetch the number of items to prefetch from the sources * @return the new Flowable instance * @since 0.17.9 */ public static <T> Flowable<T> orderedMerge(ParallelFlowable<T> source, Comparator<? super T> comparator, boolean delayErrors, int prefetch) { ObjectHelper.requireNonNull(comparator, "comparator is null"); ObjectHelper.requireNonNull(source, "sources is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ParallelOrderedMerge<T>(source, comparator, delayErrors, prefetch)); }
/** * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from * them (determined by their natural order). * @param <T> the value type of all sources * @param source the source ParallelFlowable * @return the new Flowable instance * @since 0.17.9 */ public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(ParallelFlowable<T> source) { return orderedMerge(source, Functions.naturalOrder(), false, Flowable.bufferSize()); }
/** * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from * them (determined by their natural order) and allows delaying any error they may signal. * @param <T> the value type of all sources * @param source the source ParallelFlowable * @param delayErrors if true, source errors are delayed until all sources terminate in some way * @return the new Flowable instance * @since 0.17.9 */ public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(ParallelFlowable<T> source, boolean delayErrors) { return orderedMerge(source, Functions.naturalOrder(), delayErrors, Flowable.bufferSize()); }
/** * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from * them (determined by their natural order), allows delaying any error they may signal and sets the prefetch * amount when requesting from these Publishers. * @param <T> the value type of all sources * @param source the source ParallelFlowable * @param delayErrors if true, source errors are delayed until all sources terminate in some way * @param prefetch the number of items to prefetch from the sources * @return the new Flowable instance * @since 0.17.9 */ public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(ParallelFlowable<T> source, boolean delayErrors, int prefetch) { return orderedMerge(source, Functions.naturalOrder(), delayErrors, prefetch); }
/** * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from * them (determined by the Comparator). * @param <T> the value type of all sources * @param source the source ParallelFlowable * @param comparator the comparator to use for comparing items; * it is called with the last known smallest in its first argument * @return the new Flowable instance * @since 0.17.9 */ public static <T> Flowable<T> orderedMerge(ParallelFlowable<T> source, Comparator<? super T> comparator) { return orderedMerge(source, comparator, false, Flowable.bufferSize()); }
/** * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from * them (determined by the Comparator) and allows delaying any error they may signal. * @param <T> the value type of all sources * @param source the source ParallelFlowable * @param comparator the comparator to use for comparing items; * it is called with the last known smallest in its first argument * @param delayErrors if true, source errors are delayed until all sources terminate in some way * @return the new Flowable instance * @since 0.17.9 */ public static <T> Flowable<T> orderedMerge(ParallelFlowable<T> source, Comparator<? super T> comparator, boolean delayErrors) { return orderedMerge(source, comparator, delayErrors, Flowable.bufferSize()); }