@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); recyclerView = (RecyclerView) findViewById(R.id.recyclerview); recyclerView.setLayoutManager(new LinearLayoutManager(getBaseContext())); adapter = new Adapter(); recyclerView.setAdapter(adapter); loadWithRetroJsoup(); Observable.zip( Observable.just(""), Observable.just("&"), new BiFunction<String, String, String>(){ @Override public String apply(@NonNull String s, @NonNull String s2) throws Exception { return null; } } ); }
@Test public void reduceWithMultipleItemsThenReturnFunctionResult() { boolean result = new Collector<Boolean>(configuration) .and(true) .and(false) .and(true) .reduce(new BiFunction<Boolean, Boolean, Boolean>() { @Override public Boolean apply(@NonNull Boolean itemOne, @NonNull Boolean itemTwo) { return itemOne.equals(itemTwo); } }) .call(); assertFalse(result); }
/** * Chains another step to be performed after this step completes. If the previous step results in * an error and does not emit a new actionable item, future chained onStep calls will not be * called. * * @param func to return the next step when this current step completes. This function will * receive the result of the previous step and the next actionable item to take an action on. * @param <TNewValueType> the value type returned by the next step. * @param <TNewActionableItem> the actionable item type returned by the next step. * @return a {@link Step} to chain more calls to. */ public <TNewValueType, TNewActionableItem extends ActionableItem> Step<TNewValueType, TNewActionableItem> onStep( final BiFunction<T, A, Step<TNewValueType, TNewActionableItem>> func) { return new Step<>( asObservable() .flatMap( new Function< Optional<Data<T, A>>, Observable<Optional<Data<TNewValueType, TNewActionableItem>>>>() { @Override public Observable<Optional<Data<TNewValueType, TNewActionableItem>>> apply( Optional<Data<T, A>> dataOptional) throws Exception { if (dataOptional.isPresent()) { Data<T, A> data = dataOptional.get(); return func.apply(data.value, data.actionableItem).asObservable(); } else { return Observable.just( Optional.<Data<TNewValueType, TNewActionableItem>>absent()); } } }) .singleOrError()); }
private void scanWith(){ getObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .scanWith(new Callable<Integer>() {//提供初始值的函数 @Override public Integer call() throws Exception { return 2; } }, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { return integer + integer2; } }).subscribe(getObserver()); }
@Override public Single<Metadata> generateDefault() { final int[] zMinMax = getMaxMin(tileFilenames(directory)); return FilesystemUtil .getTiles(new File(directory, String.valueOf(zMinMax[1])).getAbsolutePath(), 2) .map(FilesystemUtil::toZxy).reduce(UNDEFINED_ZXY, new BiFunction<int[], int[], int[]>() { @Override public int[] apply(int[] aa, int[] bb) throws Exception { return aa == UNDEFINED_ZXY ? (bb == UNDEFINED_ZXY ? UNDEFINED_ZXY : bb) : new int[] {Math.max(aa[0], bb[0]), Math.max(aa[1], bb[1]), Math.max(aa[2], bb[2])}; } }).map(zxy -> { if (zxy == UNDEFINED_ZXY) { return new Metadata.Builder().build(); } // TODO should be able to translate tile coordinates to // bounds shortly! return new Metadata.Builder().setMinZoom(zMinMax[0]).setMaxZoom(zMinMax[1]).build(); }).toObservable().singleOrError(); }
@NonNull private BiFunction<UIModel<S>, Result<?>, UIModel<S>> reducer() { return (currentUIModel, result) -> { String event = result.getEvent(); S bundle = currentUIModel.getBundle(); if (result.isLoading()) { currentUIModel = loadingState(create(event, bundle)); } else if (result.isSuccessful()) { currentUIModel = successState(create(event, stateReducer().reduce(result.getBundle(), event, bundle))); } else { currentUIModel = errorState(result.getThrowable(), create(event, bundle)); } return currentUIModel; }; }
DifferentialFlowableList(Flowable<List<T>> list, boolean detectMoves) { _detectMoves = detectMoves; _diffTransform = list .map(new Function<List<T>, Update<T>>() { @Override public Update<T> apply(List<T> ts) { return new Update<>(ts, Change.reloaded()); } }) .scan(new BiFunction<Update<T>, Update<T>, Update<T>>() { @Override public Update<T> apply(Update<T> previous, Update<T> next) { if (previous == null) { return next; } List<Change> changes = computeDiff(previous.list, next.list); _previousList = next.list; return new Update<>(next.list, changes); } }); }
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats( final Function<? super T, ? extends R> function) { return new FlowableTransformer<T, Pair<T, Statistics>>() { @Override public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) { return source.scan(Pair.create((T) null, Statistics.create()), new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() { @Override public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception { return Pair.create(t, pair.b().add(function.apply(t))); } }).skip(1); } }; }
@Override public Observable<?> apply(Observable<? extends Throwable> flowable) throws Exception { return flowable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() { @Override public Wrapper apply(Throwable throwable, Integer integer) throws Exception { return new Wrapper(throwable, integer); } }).flatMap(wrapper -> { if ((wrapper.throwable instanceof ConnectException || wrapper.throwable instanceof SocketTimeoutException || wrapper.throwable instanceof TimeoutException) && wrapper.index < count + 1) { return Observable.timer(delay + (wrapper.index - 1) * delay, TimeUnit.MILLISECONDS); } return Observable.error(wrapper.throwable); } ); }
@Test public void testElementsList() throws Exception { List<TestObject> elements = new ArrayList<>(); elements.add(new TestObject(1, 9.5, "a")); elements.add(new TestObject(2, 6.7, "ab")); elements.add(new TestObject(3, 8.2, "abc")); elements.add(new TestObject(4, 3.4, "abcd")); elements.add(new TestObject(5, 6.5, "abcde")); SQLite.get().insert(TestTable.TABLE, elements); Observable.zip(RxSQLite.get().query(TestTable.TABLE), Observable.just(elements), new BiFunction<List<TestObject>, List<TestObject>, Object>() { @Override public Object apply(List<TestObject> testElements, List<TestObject> savedElements) throws Exception { assertEquals(testElements.size(), savedElements.size()); for (int i = 0; i < testElements.size(); i++) { assertEquals(testElements.size(), savedElements.size()); } return null; } }) .test(); }
public static void practice1(){ final Api api = RetrofitProvider.get().create(Api.class); Observable<UserBaseInfoResponse> observable1 = api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io()); Observable<UserExtraInfoResponse> observable2 = api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() { @Override public UserInfo apply(UserBaseInfoResponse baseInfo, UserExtraInfoResponse extraInfo) throws Exception { return new UserInfo(baseInfo, extraInfo); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<UserInfo>() { @Override public void accept(UserInfo userInfo) throws Exception { //do something; } }); }
@Test public void avgFunction() { final double count = 8; final Integer sum = Observable.fromIterable(insertSimpleAllValues((int) count)) .map(new Function<SimpleAllValuesMutable, Integer>() { @Override public Integer apply(SimpleAllValuesMutable v) { return (int) v.primitiveShort; } }) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer v1, Integer v2) { return v1 + v2; } }) .blockingGet(); final Double value = Select .column(avg(SIMPLE_ALL_VALUES_MUTABLE.PRIMITIVE_SHORT)) .from(SIMPLE_ALL_VALUES_MUTABLE) .takeFirst() .execute(); assertThat(value).isEqualTo(sum.doubleValue() / count); }
private void loadStringArray(final StudyContentDao studyContentDao) { String[] titleList = getResources().getStringArray(R.array.study_ui_title); String[] descList = getResources().getStringArray(R.array.study_ui_description); Observable<String> observableTitle = Observable.fromArray(titleList); Observable<String> observableDescList = Observable.fromArray(descList); Observable.zip(observableTitle, observableDescList, new BiFunction<String, String, StudyContent>() { @Override public StudyContent apply(@io.reactivex.annotations.NonNull String s, @io.reactivex.annotations.NonNull String s2) throws Exception { StudyContent studyItemModel = new StudyContent(); studyItemModel.setTitle(s); studyItemModel.setDescription(s2); studyItemModel.setType(s); return studyItemModel; } }).subscribe(new Consumer<StudyContent>() { @Override public void accept(@io.reactivex.annotations.NonNull StudyContent studyContent) throws Exception { studyContentDao.insertOrReplace(studyContent); } }); }
private void findUsersWhoLovesBoth() { // here we are using zip operator to combine both request Observable.zip(getCricketFansObservable(), getFootballFansObservable(), new BiFunction<List<User>, List<User>, List<User>>() { @Override public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception { List<User> userWhoLovesBoth = filterUserWhoLovesBoth(cricketFans, footballFans); return userWhoLovesBoth; } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<User>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<User> users) { // do anything with user who loves both Log.d(TAG, "userList size : " + users.size()); for (User user : users) { Log.d(TAG, "user : " + user.toString()); } } @Override public void onError(Throwable e) { Utils.logError(TAG, e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
private void doSomeWork() { getObservable() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer int1, Integer int2) throws Exception { return int1 + int2; } }) .subscribe(getObserver()); }
private void doSomeWork() { getObservable() .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t1, Integer t2) { return t1 + t2; } }) .subscribe(getObserver()); }
private void doSomeWork() { Flowable<Integer> observable = Flowable.just(1, 2, 3, 4); observable.reduce(50, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t1, Integer t2) { return t1 + t2; } }).subscribe(getObserver()); }
private void doSomeWork() { Observable.zip(getCricketFansObservable(), getFootballFansObservable(), new BiFunction<List<User>, List<User>, List<User>>() { @Override public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception { return Utils.filterUserWhoLovesBoth(cricketFans, footballFans); } }) // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }
@Override public Single<? extends WhitelistState> startFetchWhitelistState() { return Single.zip(FFMService.getDiscussWhitelist(), OpenQQService.getDiscussesInfo(), new BiFunction<DiscussWhitelistState, List<Discuss>, DiscussWhitelistState>() { @Override public DiscussWhitelistState apply(DiscussWhitelistState state, List<Discuss> groups) throws Exception { state.generateStates(groups); return state; } }); }
@Override public Single<? extends WhitelistState> startFetchWhitelistState() { return Single.zip(FFMService.getGroupWhitelist(), OpenQQService.getGroupsBasicInfo(), new BiFunction<GroupWhitelistState, List<Group>, GroupWhitelistState>() { @Override public GroupWhitelistState apply(GroupWhitelistState state, List<Group> groups) throws Exception { state.generateStates(groups); return state; } }); }
public <CLASS> Observable<CLASS> get(Class<CLASS> theClass) { return Observable.zip( onEvent(theClass), postAsObservable(new AskedEvent(theClass)), new BiFunction<CLASS, Object, CLASS>() { @Override public CLASS apply(@NonNull CLASS neededObject, @NonNull Object _useless) throws Exception { return neededObject; } }); }
@Override public Observable<JSONObject> createFinalFlowObservable() { return Observable.zip(getMe, chapter, new BiFunction<JSONObject, JSONArray, JSONObject>() { @Override public JSONObject apply(@NonNull JSONObject jsonObject, @NonNull JSONArray jsonArray) throws Exception { String name = jsonObject.getString("name"); Log.d("NONO", "TestGroupTask---" + name + ":" + jsonArray.length()); return new JSONObject(); } }); }
@Override public Observable<?> apply(@NonNull Observable<? extends Throwable> observable) throws Exception { return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() { @Override public Wrapper apply(@NonNull Throwable throwable, @NonNull Integer integer) throws Exception { return new Wrapper(throwable, integer); } }).flatMap(new Function<Wrapper, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Wrapper wrapper) throws Exception { if (wrapper.index > 1) HttpLog.i("重试次数:" + (wrapper.index)); int errCode = 0; if (wrapper.throwable instanceof ApiException) { ApiException exception = (ApiException) wrapper.throwable; errCode = exception.getCode(); } if ((wrapper.throwable instanceof ConnectException || wrapper.throwable instanceof SocketTimeoutException || errCode == ApiException.ERROR.NETWORD_ERROR || errCode == ApiException.ERROR.TIMEOUT_ERROR || wrapper.throwable instanceof SocketTimeoutException || wrapper.throwable instanceof TimeoutException) && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS); } return Observable.error(wrapper.throwable); } }); }
static void testAnnoatedThirdParty() { // BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return Function<String, Object> f1 = (x) -> null; // io.reactivex.(Bi)Function is anotated Function<String, Object> f2 = (x) -> { // BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return null; }; // BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return BiFunction<String, String, Object> f3 = (x, y) -> null; }
/** * reduce all the items in this {@link Collector} * * @param reducer the reducer invoke * @return the result of the reducer invoke */ public Chain<T> reduce(BiFunction<T, T, T> reducer) { if (items.isEmpty()) { return new Chain<>(null, configuration); } return Observable.fromIterable(items) .reduce(reducer) .map(toChain()) .blockingGet(); }
@Test public void reduceWithOneItemThenReturnThisItemInResult() { boolean result = new Collector<Boolean>(configuration) .and(true) .reduce(new BiFunction<Boolean, Boolean, Boolean>() { @Override public Boolean apply(@NonNull Boolean itemOne, @NonNull Boolean itemTwo) { return itemOne.equals(itemTwo); } }) .call(); assertTrue(result); }
@Test(expected = UnsupportedOperationException.class) public void reduceWithCrashingFunctionThenThrowException() { new Collector<Boolean>(configuration) .and(true) .and(false) .and(true) .reduce(new BiFunction<Boolean, Boolean, Boolean>() { @Override public Boolean apply(@NonNull Boolean itemOne, @NonNull Boolean itemTwo) { throw new UnsupportedOperationException(); } }); }
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle, final Function<R, R> correspondingEvents) { return Observable.combineLatest( lifecycle.take(1).map(correspondingEvents), lifecycle.skip(1), new BiFunction<R, R, Boolean>() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } }) .onErrorReturn(Functions.RESUME_FUNCTION) .filter(Functions.SHOULD_COMPLETE); }
@Test public void onStep_withASuccessFullFirstAction_shouldProperlyChainTheNextStep() { Object returnValue = new Object(); final Object secondReturnValue = new Object(); TestObserver<Optional<Step.Data<Object, ActionableItem>>> testSubscriber = new TestObserver<>(); interactorLifecycleSubject.onNext(InteractorEvent.ACTIVE); step.onStep( new BiFunction<Object, ActionableItem, Step<Object, ActionableItem>>() { @Override public Step<Object, ActionableItem> apply(Object o, ActionableItem actionableItem) { return Step.from( Observable.just(new Step.Data<>(secondReturnValue, actionableItem)) .singleOrError()); } }) .asObservable() .subscribe(testSubscriber); returnValueSubject.onNext( Optional.of( new Step.Data<Object, ActionableItem>( returnValue, new ActionableItem() { @NonNull @Override public Observable<InteractorEvent> lifecycle() { return interactorLifecycleSubject; } }))); returnValueSubject.onComplete(); testSubscriber.assertValueCount(1); assertThat(testSubscriber.values().get(0).get().getValue()).isEqualTo(secondReturnValue); testSubscriber.assertComplete(); testSubscriber.assertNoErrors(); }
@Test public void onStep_withAnUnsuccessfulFirstAction_shouldTerminateTheWholeChain() { TestObserver<Optional<Step.Data<Object, ActionableItem>>> testSubscriber = new TestObserver<>(); final Object secondReturnValue = new Object(); interactorLifecycleSubject.onNext(InteractorEvent.ACTIVE); step.onStep( new BiFunction<Object, ActionableItem, Step<Object, ActionableItem>>() { @Override public Step<Object, ActionableItem> apply(Object o, ActionableItem actionableItem) { return Step.from( Observable.just(new Step.Data<>(secondReturnValue, actionableItem)) .singleOrError()); } }) .asObservable() .subscribe(testSubscriber); returnValueSubject.onNext(Optional.<Step.Data<Object, ActionableItem>>absent()); returnValueSubject.onComplete(); testSubscriber.assertValueCount(1); assertThat(testSubscriber.values().get(0).isPresent()).isFalse(); testSubscriber.assertComplete(); testSubscriber.assertNoErrors(); }
private void scanWithInitValue(){ getObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .scan(10, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { return integer + integer2; } }).subscribe(getObserver()); }
/** * Reduce操作符应用一个函数接收Observable发射的数据和函数的计算结果作为下次计算的参数, * 输出最后的结果。跟前面我们了解过的scan操作符很类似,只是scan会输出每次计算的结果,而reduce只会输出最后的结果。 */ private void doSomeWork() { getObservable() .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t1, Integer t2) { return t1 + t2; } }) .subscribe(getObserver()); }
private void doSomeWork() { final String[] aStrings = {"A1", "A2", "A3", "A4"}; final String[] bStrings = {"B1", "B2", "B3"}; final Observable<String> aObservable = Observable.fromArray(aStrings); final Observable<String> bObservable = Observable.fromArray(bStrings); Observable.combineLatest(aObservable, bObservable, new BiFunction<String, String, String>() { @Override public String apply(@NonNull String s, @NonNull String s2) throws Exception { return s + "-" + s2; } }).subscribe(getObserver()); }
@Override public void onViewCreated(View view, @Nullable Bundle savedInstanceState) { super.onViewCreated(view, savedInstanceState); mRepoListView = view.findViewById(R.id.repo_list_view); mObserverLog = view.findViewById(R.id.observer_log); mObserverLog.setMovementMethod(new ScrollingMovementMethod()); //Taking two Observable streams for two users on Github. Observable.zip(getReposForUser("google"), getReposForUser("fakher-hakim"), new BiFunction<RepositoryResponse, RepositoryResponse, List<RepositoryResponse>>() { @Override public List<RepositoryResponse> apply(@NonNull RepositoryResponse googleRepositoryResponse, @NonNull RepositoryResponse fakherRepositoryResponse) throws Exception { //The result of zipping the two streams. List<RepositoryResponse> zipResult = new ArrayList<>(); //If both repositories have the same language we add them to the result stream. if (googleRepositoryResponse.language .equals(fakherRepositoryResponse.language)) { zipResult.add(fakherRepositoryResponse); zipResult.add(googleRepositoryResponse); } //Return the new stream with result data. return zipResult; } }) //Subscribe the Network call in io Thread. .subscribeOn(Schedulers.io()) //Subscribe the Observer in MainThread so it can updates the UI with the result. .observeOn(AndroidSchedulers.mainThread()) //Choose the subscribed Observer for items emitted by this observable. .subscribe(mListBaseObserver); }
/** * Function computing an average of {@link Integer}. * It returns a {@link Tuple} structured as follows: 0: element-count, 1: sum of the element, 2: average * @return a tuple containing in this order the number of element in the series, the sum of the element and the * average. */ public static BiFunction<Tuple, Integer, Tuple> average() { return (tuple, rating) -> { long count = tuple.nth(0); double sum = tuple.nth(1); count = count + 1; sum = sum + rating; double avg = sum / count; return Tuple.tuple(count, sum, avg); }; }
private Flowable<List<User>> search(String query) { return dataUseCase.<User>queryDisk(realm -> realm.where(User.class).beginsWith(User.LOGIN, query)) .zipWith(dataUseCase.<User>getObject(new GetRequest.Builder(User.class, false) .url(String.format(USER, query)).build()) .onErrorReturnItem(new User()).filter(user -> user.getId() != 0) .map(user -> user != null ? Collections.singletonList(user) : Collections.emptyList()), (BiFunction<List<User>, List<User>, List<User>>) (users, singleton) -> { users.addAll(singleton); return new ArrayList<>(new HashSet<>(users)); }); }
@Test public void should_sum() { // given Observable<Integer> input = cold("a-b-c-d", of("a", 1, "b", 2, "c", 3, "d", 4)); // when Observable<Integer> output = input.scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer first, Integer second) { return first + second; } }); // then expectObservable(output).toBe("A-B-C-D", of("A", 1, "B", 3, "C", 6, "D", 10)); }