@Test public void testBasicGroupByFlowable() throws InterruptedException { Flowable<GroupedFlowable<String, Integer>> groupedFlowable = Flowable.range(1, 100).groupBy(integer -> { if (integer % 2 == 0) return "Even"; else return "Odd"; }); groupedFlowable.subscribe(g -> g.subscribe(x -> System.out.println("g:" + g.getKey() + ", value:" + x))); Thread.sleep(4000); }
@Test public void testBasicGroupByFlowableReduceIntoMultiMap() { Flowable<GroupedFlowable<String, Integer>> groupedFlowable = Flowable.range(1, 100).groupBy(integer -> { if (integer % 2 == 0) return "Even"; else return "Odd"; }); Map<String, Single<List<Integer>>> result = new HashMap<>(); groupedFlowable.subscribe(g -> result.put(g.getKey(), g.toList())); System.out.println(result.get("Even").blockingGet()); System.out.println(result.get("Odd").blockingGet()); }
@Override public void accept(GroupedFlowable<MessageKey, SendingTask<M>> group) throws Exception { Flowable<List<SendingTask<M>>> bufferedMessages = group.buffer(messageTimeoutNanos, TimeUnit.NANOSECONDS, scheduler, bufferedMaxMessages); bufferedMessages.subscribeOn(scheduler).subscribe(SenderConsumerBridge.toConsumer(sender)); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") public <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector) { return boxed.groupBy(keySelector); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") public <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector, boolean delayError) { return boxed.groupBy(keySelector, delayError); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") public <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector) { return boxed.groupBy(keySelector, valueSelector); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") public <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, boolean delayError) { return boxed.groupBy(keySelector, valueSelector, delayError); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") public <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, boolean delayError, int bufferSize) { return boxed.groupBy(keySelector, valueSelector, delayError, bufferSize); }