@Test public void shouldBeNecessaryToSubscribetoStreamAfterSplitting() { final double[] averages = {0, 0}; Observable<Integer> numbers = Observable.just(22, 22, 99, 22, 101, 22); Function<Integer, Integer> keySelector = integer -> integer % 2; Observable<GroupedObservable<Integer, Integer>> split = numbers.groupBy(keySelector); split.subscribe( group -> { Observable<Double> convertToDouble = group.map(integer -> (double) integer); Function<Double, Double> insertIntoAveragesArray = aDouble -> averages[group.getKey()] = aDouble; convertToDouble.reduce((t1, t2) -> t1+t2).map(insertIntoAveragesArray).subscribe(); } ); assertThat(averages[0]).isEqualTo(0); assertThat(averages[1]).isEqualTo(0); }
private void doSomeWork() { Observable.range(0, 8).groupBy(new Function<Integer, String>() { @Override public String apply(@NonNull Integer integer) throws Exception { return integer % 2 == 0 ? "偶数" : "奇数"; } }).subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(@NonNull GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception { String key = stringIntegerGroupedObservable.getKey(); Log.i(TAG, "accept: key=" + key); if (key.equals("偶数")) { stringIntegerGroupedObservable.subscribe(getObserver(key)); } else { stringIntegerGroupedObservable.subscribe(getObserver(key)); } } }); }
@Test public void completeCompletesInner() { Observable<Message> messages = Observable.just(new Message("Bob", "Hello")); final AtomicInteger seen = new AtomicInteger(); WindowIfChanged.create(messages, userSelector) .switchMap( new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() { @Override public Observable<Notification<String>> apply( GroupedObservable<String, Message> group) { final int count = seen.incrementAndGet(); return group.map(new Function<Message, String>() { @Override public String apply(Message message) throws Exception { return count + " " + message; } }).materialize(); } }) .test() .assertValues( // Notification.createOnNext("1 Bob Hello"), // Notification.<String>createOnComplete()) // .assertComplete(); }
@Test public void errorCompletesInner() { RuntimeException error = new RuntimeException("boom!"); Observable<Message> messages = Observable.just( // Notification.createOnNext(new Message("Bob", "Hello")), Notification.createOnError(error) ).dematerialize(); final AtomicInteger seen = new AtomicInteger(); WindowIfChanged.create(messages, userSelector) .switchMap( new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() { @Override public Observable<Notification<String>> apply( GroupedObservable<String, Message> group) { final int count = seen.incrementAndGet(); return group.map(new Function<Message, String>() { @Override public String apply(Message message) throws Exception { return count + " " + message; } }).materialize(); } }) .test() .assertValues( // Notification.createOnNext("1 Bob Hello"), // Notification.<String>createOnComplete()) // .assertError(error); }
public static void main(String[] args) { Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon"); Observable<GroupedObservable<Integer, String>> byLengths = source.groupBy(s -> s.length()); byLengths.flatMapSingle(grp -> grp.reduce("", (x, y) -> x.equals("") ? y : x + ", " + y) .map(s -> grp.getKey() + ": " + s) ).subscribe(System.out::println); }
public static void main(String[] args) { Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon"); Observable<GroupedObservable<Integer, String>> byLengths = source.groupBy(s -> s.length()); byLengths.flatMapSingle(grp -> grp.toList()) .subscribe(System.out::println); }
@Test public void testBasicGroupByObservable() throws InterruptedException { Observable<GroupedObservable<String, Integer>> grouped = Observable.range(1, 100) .groupBy(integer -> { if (integer % 2 == 0) return "Even"; else return "Odd"; }); grouped.subscribe(g -> g.subscribe(x -> System.out.println ("g:" + g.getKey() + ", value:" + x))); Thread.sleep(4000); }
@Test public void splits() { Observable<Message> messages = Observable.just( // new Message("Bob", "Hello"), // new Message("Bob", "World"), // new Message("Alice", "Hey"), // new Message("Bob", "What's"), // new Message("Bob", "Up?"), // new Message("Eve", "Hey") // ); final AtomicInteger seen = new AtomicInteger(); WindowIfChanged.create(messages, userSelector) .switchMap( new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() { @Override public Observable<Notification<String>> apply( GroupedObservable<String, Message> group) { final int count = seen.incrementAndGet(); return group.map(new Function<Message, String>() { @Override public String apply(Message message) throws Exception { return count + " " + message; } }).materialize(); } }) .test() .assertValues( // Notification.createOnNext("1 Bob Hello"), // Notification.createOnNext("1 Bob World"), // Notification.<String>createOnComplete(), // Notification.createOnNext("2 Alice Hey"), // Notification.<String>createOnComplete(), // Notification.createOnNext("3 Bob What's"), // Notification.createOnNext("3 Bob Up?"), // Notification.<String>createOnComplete(), // Notification.createOnNext("4 Eve Hey"), // Notification.<String>createOnComplete()); // }
public static void main(String[] args) { Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon"); Observable<GroupedObservable<Integer, String>> byLengths = source.groupBy(s -> s.length()); }
@Override protected void subscribeActual(Observer<? super GroupedObservable<K, T>> observer) { upstream.subscribe(new WindowIfChangedObserver<>(keySelector, observer)); }
WindowIfChangedObserver(Function<? super T, ? extends K> keySelector, Observer<? super GroupedObservable<K, T>> observer) { this.keySelector = keySelector; this.observer = observer; }
public static <T, K> Observable<GroupedObservable<K, T>> create(Observable<T> upstream, Function<? super T, ? extends K> keySelector) { return new WindowIfChangedObservable<>(upstream, keySelector); }
@CheckReturnValue @SchedulerSupport("none") public <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector) { return boxed.groupBy(keySelector); }
@CheckReturnValue @SchedulerSupport("none") public <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector, boolean delayError) { return boxed.groupBy(keySelector, delayError); }
@CheckReturnValue @SchedulerSupport("none") public <K, V> Observable<GroupedObservable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector) { return boxed.groupBy(keySelector, valueSelector); }
@CheckReturnValue @SchedulerSupport("none") public <K, V> Observable<GroupedObservable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, boolean delayError) { return boxed.groupBy(keySelector, valueSelector, delayError); }
@CheckReturnValue @SchedulerSupport("none") public <K, V> Observable<GroupedObservable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, boolean delayError, int bufferSize) { return boxed.groupBy(keySelector, valueSelector, delayError, bufferSize); }