Java 类io.reactivex.functions.BiFunction 实例源码
项目:RxRetroJsoup
文件:MainActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
recyclerView = (RecyclerView) findViewById(R.id.recyclerview);
recyclerView.setLayoutManager(new LinearLayoutManager(getBaseContext()));
adapter = new Adapter();
recyclerView.setAdapter(adapter);
loadWithRetroJsoup();
Observable.zip(
Observable.just(""),
Observable.just("&"),
new BiFunction<String, String, String>(){
@Override
public String apply(@NonNull String s, @NonNull String s2) throws Exception {
return null;
}
}
);
}
项目:J-Chain
文件:CollectorTest.java
@Test
public void reduceWithMultipleItemsThenReturnFunctionResult() {
boolean result = new Collector<Boolean>(configuration)
.and(true)
.and(false)
.and(true)
.reduce(new BiFunction<Boolean, Boolean, Boolean>() {
@Override
public Boolean apply(@NonNull Boolean itemOne, @NonNull Boolean itemTwo) {
return itemOne.equals(itemTwo);
}
})
.call();
assertFalse(result);
}
项目:RIBs
文件:Step.java
/**
* Chains another step to be performed after this step completes. If the previous step results in
* an error and does not emit a new actionable item, future chained onStep calls will not be
* called.
*
* @param func to return the next step when this current step completes. This function will
* receive the result of the previous step and the next actionable item to take an action on.
* @param <TNewValueType> the value type returned by the next step.
* @param <TNewActionableItem> the actionable item type returned by the next step.
* @return a {@link Step} to chain more calls to.
*/
public <TNewValueType, TNewActionableItem extends ActionableItem>
Step<TNewValueType, TNewActionableItem> onStep(
final BiFunction<T, A, Step<TNewValueType, TNewActionableItem>> func) {
return new Step<>(
asObservable()
.flatMap(
new Function<
Optional<Data<T, A>>,
Observable<Optional<Data<TNewValueType, TNewActionableItem>>>>() {
@Override
public Observable<Optional<Data<TNewValueType, TNewActionableItem>>> apply(
Optional<Data<T, A>> dataOptional) throws Exception {
if (dataOptional.isPresent()) {
Data<T, A> data = dataOptional.get();
return func.apply(data.value, data.actionableItem).asObservable();
} else {
return Observable.just(
Optional.<Data<TNewValueType, TNewActionableItem>>absent());
}
}
})
.singleOrError());
}
项目:RxJava2-Android-Sample
文件:ScanExampleActivity.java
private void scanWith(){
getObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.scanWith(new Callable<Integer>() {//提供初始值的函数
@Override
public Integer call() throws Exception {
return 2;
}
}, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(getObserver());
}
项目:vt-support
文件:StorageImpl.java
@Override
public Single<Metadata> generateDefault() {
final int[] zMinMax = getMaxMin(tileFilenames(directory));
return FilesystemUtil
.getTiles(new File(directory, String.valueOf(zMinMax[1])).getAbsolutePath(), 2)
.map(FilesystemUtil::toZxy).reduce(UNDEFINED_ZXY, new BiFunction<int[], int[], int[]>() {
@Override
public int[] apply(int[] aa, int[] bb) throws Exception {
return aa == UNDEFINED_ZXY ? (bb == UNDEFINED_ZXY ? UNDEFINED_ZXY : bb)
: new int[] {Math.max(aa[0], bb[0]), Math.max(aa[1], bb[1]),
Math.max(aa[2], bb[2])};
}
}).map(zxy -> {
if (zxy == UNDEFINED_ZXY) {
return new Metadata.Builder().build();
}
// TODO should be able to translate tile coordinates to
// bounds shortly!
return new Metadata.Builder().setMinZoom(zMinMax[0]).setMaxZoom(zMinMax[1]).build();
}).toObservable().singleOrError();
}
项目:RxRedux
文件:BaseViewModel.java
@NonNull
private BiFunction<UIModel<S>, Result<?>, UIModel<S>> reducer() {
return (currentUIModel, result) -> {
String event = result.getEvent();
S bundle = currentUIModel.getBundle();
if (result.isLoading()) {
currentUIModel = loadingState(create(event, bundle));
} else if (result.isSuccessful()) {
currentUIModel = successState(create(event,
stateReducer().reduce(result.getBundle(), event, bundle)));
} else {
currentUIModel = errorState(result.getThrowable(), create(event, bundle));
}
return currentUIModel;
};
}
项目:rxtools
文件:DifferentialFlowableList.java
DifferentialFlowableList(Flowable<List<T>> list, boolean detectMoves)
{
_detectMoves = detectMoves;
_diffTransform = list
.map(new Function<List<T>, Update<T>>() {
@Override
public Update<T> apply(List<T> ts) {
return new Update<>(ts, Change.reloaded());
}
})
.scan(new BiFunction<Update<T>, Update<T>, Update<T>>() {
@Override
public Update<T> apply(Update<T> previous, Update<T> next) {
if (previous == null) {
return next;
}
List<Change> changes = computeDiff(previous.list, next.list);
_previousList = next.list;
return new Update<>(next.list, changes);
}
});
}
项目:rxjava2-extras
文件:Transformers.java
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
final Function<? super T, ? extends R> function) {
return new FlowableTransformer<T, Pair<T, Statistics>>() {
@Override
public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
return source.scan(Pair.create((T) null, Statistics.create()),
new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
@Override
public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
return Pair.create(t, pair.b().add(function.apply(t)));
}
}).skip(1);
}
};
}
项目:GankGirl
文件:RetryWhenNetworkException.java
@Override
public Observable<?> apply(Observable<? extends Throwable> flowable) throws Exception {
return flowable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
@Override
public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
return new Wrapper(throwable, integer);
}
}).flatMap(wrapper -> {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) {
return Observable.timer(delay + (wrapper.index - 1) * delay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
} );
}
项目:SQLite
文件:RxSQLiteTest.java
@Test
public void testElementsList() throws Exception {
List<TestObject> elements = new ArrayList<>();
elements.add(new TestObject(1, 9.5, "a"));
elements.add(new TestObject(2, 6.7, "ab"));
elements.add(new TestObject(3, 8.2, "abc"));
elements.add(new TestObject(4, 3.4, "abcd"));
elements.add(new TestObject(5, 6.5, "abcde"));
SQLite.get().insert(TestTable.TABLE, elements);
Observable.zip(RxSQLite.get().query(TestTable.TABLE),
Observable.just(elements), new BiFunction<List<TestObject>, List<TestObject>, Object>() {
@Override
public Object apply(List<TestObject> testElements, List<TestObject> savedElements) throws Exception {
assertEquals(testElements.size(), savedElements.size());
for (int i = 0; i < testElements.size(); i++) {
assertEquals(testElements.size(), savedElements.size());
}
return null;
}
})
.test();
}
项目:RxJava2Demo
文件:ChapterFour.java
public static void practice1(){
final Api api = RetrofitProvider.get().create(Api.class);
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
}
项目:sqlitemagic
文件:SynchronousColumnQueryTest.java
@Test
public void avgFunction() {
final double count = 8;
final Integer sum = Observable.fromIterable(insertSimpleAllValues((int) count))
.map(new Function<SimpleAllValuesMutable, Integer>() {
@Override
public Integer apply(SimpleAllValuesMutable v) {
return (int) v.primitiveShort;
}
})
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer v1, Integer v2) {
return v1 + v2;
}
})
.blockingGet();
final Double value = Select
.column(avg(SIMPLE_ALL_VALUES_MUTABLE.PRIMITIVE_SHORT))
.from(SIMPLE_ALL_VALUES_MUTABLE)
.takeFirst()
.execute();
assertThat(value).isEqualTo(sum.doubleValue() / count);
}
项目:beautifullife
文件:App.java
private void loadStringArray(final StudyContentDao studyContentDao) {
String[] titleList = getResources().getStringArray(R.array.study_ui_title);
String[] descList = getResources().getStringArray(R.array.study_ui_description);
Observable<String> observableTitle = Observable.fromArray(titleList);
Observable<String> observableDescList = Observable.fromArray(descList);
Observable.zip(observableTitle, observableDescList, new BiFunction<String, String, StudyContent>() {
@Override
public StudyContent apply(@io.reactivex.annotations.NonNull String s, @io.reactivex.annotations.NonNull String s2) throws Exception {
StudyContent studyItemModel = new StudyContent();
studyItemModel.setTitle(s);
studyItemModel.setDescription(s2);
studyItemModel.setType(s);
return studyItemModel;
}
}).subscribe(new Consumer<StudyContent>() {
@Override
public void accept(@io.reactivex.annotations.NonNull StudyContent studyContent) throws Exception {
studyContentDao.insertOrReplace(studyContent);
}
});
}
项目:GitHub
文件:NetworkingActivity.java
private void findUsersWhoLovesBoth() {
// here we are using zip operator to combine both request
Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
List<User> userWhoLovesBoth =
filterUserWhoLovesBoth(cricketFans, footballFans);
return userWhoLovesBoth;
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<User>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<User> users) {
// do anything with user who loves both
Log.d(TAG, "userList size : " + users.size());
for (User user : users) {
Log.d(TAG, "user : " + user.toString());
}
}
@Override
public void onError(Throwable e) {
Utils.logError(TAG, e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
项目:GitHub
文件:ScanExampleActivity.java
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer int1, Integer int2) throws Exception {
return int1 + int2;
}
})
.subscribe(getObserver());
}
项目:GitHub
文件:ReduceExampleActivity.java
private void doSomeWork() {
getObservable()
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
})
.subscribe(getObserver());
}
项目:GitHub
文件:FlowableExampleActivity.java
private void doSomeWork() {
Flowable<Integer> observable = Flowable.just(1, 2, 3, 4);
observable.reduce(50, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
}).subscribe(getObserver());
}
项目:GitHub
文件:ZipExampleActivity.java
private void doSomeWork() {
Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
return Utils.filterUserWhoLovesBoth(cricketFans, footballFans);
}
})
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
项目:FCM-for-Mojo
文件:DiscussWhitelistActivity.java
@Override
public Single<? extends WhitelistState> startFetchWhitelistState() {
return Single.zip(FFMService.getDiscussWhitelist(), OpenQQService.getDiscussesInfo(),
new BiFunction<DiscussWhitelistState, List<Discuss>, DiscussWhitelistState>() {
@Override
public DiscussWhitelistState apply(DiscussWhitelistState state, List<Discuss> groups) throws Exception {
state.generateStates(groups);
return state;
}
});
}
项目:FCM-for-Mojo
文件:GroupWhitelistActivity.java
@Override
public Single<? extends WhitelistState> startFetchWhitelistState() {
return Single.zip(FFMService.getGroupWhitelist(), OpenQQService.getGroupsBasicInfo(),
new BiFunction<GroupWhitelistState, List<Group>, GroupWhitelistState>() {
@Override
public GroupWhitelistState apply(GroupWhitelistState state, List<Group> groups) throws Exception {
state.generateStates(groups);
return state;
}
});
}
项目:Rx_java2_soussidev
文件:RxBus_java.java
public <CLASS> Observable<CLASS> get(Class<CLASS> theClass) {
return Observable.zip(
onEvent(theClass),
postAsObservable(new AskedEvent(theClass)),
new BiFunction<CLASS, Object, CLASS>() {
@Override
public CLASS apply(@NonNull CLASS neededObject, @NonNull Object _useless) throws Exception {
return neededObject;
}
});
}
项目:Dalaran
文件:TestGroupTask.java
@Override
public Observable<JSONObject> createFinalFlowObservable() {
return Observable.zip(getMe, chapter, new BiFunction<JSONObject, JSONArray, JSONObject>() {
@Override
public JSONObject apply(@NonNull JSONObject jsonObject, @NonNull JSONArray jsonArray) throws Exception {
String name = jsonObject.getString("name");
Log.d("NONO", "TestGroupTask---" + name + ":" + jsonArray.length());
return new JSONObject();
}
});
}
项目:RxEasyHttp
文件:RetryExceptionFunc.java
@Override
public Observable<?> apply(@NonNull Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
@Override
public Wrapper apply(@NonNull Throwable throwable, @NonNull Integer integer) throws Exception {
return new Wrapper(throwable, integer);
}
}).flatMap(new Function<Wrapper, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Wrapper wrapper) throws Exception {
if (wrapper.index > 1)
HttpLog.i("重试次数:" + (wrapper.index));
int errCode = 0;
if (wrapper.throwable instanceof ApiException) {
ApiException exception = (ApiException) wrapper.throwable;
errCode = exception.getCode();
}
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| errCode == ApiException.ERROR.NETWORD_ERROR
|| errCode == ApiException.ERROR.TIMEOUT_ERROR
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
项目:NullAway
文件:NullAwayJava8PositiveCases.java
static void testAnnoatedThirdParty() {
// BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return
Function<String, Object> f1 = (x) -> null; // io.reactivex.(Bi)Function is anotated
Function<String, Object> f2 =
(x) -> {
// BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull
return null;
};
// BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return
BiFunction<String, String, Object> f3 = (x, y) -> null;
}
项目:J-Chain
文件:Collector.java
/**
* reduce all the items in this {@link Collector}
*
* @param reducer the reducer invoke
* @return the result of the reducer invoke
*/
public Chain<T> reduce(BiFunction<T, T, T> reducer) {
if (items.isEmpty()) {
return new Chain<>(null, configuration);
}
return Observable.fromIterable(items)
.reduce(reducer)
.map(toChain())
.blockingGet();
}
项目:J-Chain
文件:CollectorTest.java
@Test
public void reduceWithOneItemThenReturnThisItemInResult() {
boolean result = new Collector<Boolean>(configuration)
.and(true)
.reduce(new BiFunction<Boolean, Boolean, Boolean>() {
@Override
public Boolean apply(@NonNull Boolean itemOne, @NonNull Boolean itemTwo) {
return itemOne.equals(itemTwo);
}
})
.call();
assertTrue(result);
}
项目:J-Chain
文件:CollectorTest.java
@Test(expected = UnsupportedOperationException.class)
public void reduceWithCrashingFunctionThenThrowException() {
new Collector<Boolean>(configuration)
.and(true)
.and(false)
.and(true)
.reduce(new BiFunction<Boolean, Boolean, Boolean>() {
@Override
public Boolean apply(@NonNull Boolean itemOne, @NonNull Boolean itemTwo) {
throw new UnsupportedOperationException();
}
});
}
项目:RxLifeCycle
文件:RxLifecycle.java
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
项目:RIBs
文件:StepTest.java
@Test
public void onStep_withASuccessFullFirstAction_shouldProperlyChainTheNextStep() {
Object returnValue = new Object();
final Object secondReturnValue = new Object();
TestObserver<Optional<Step.Data<Object, ActionableItem>>> testSubscriber = new TestObserver<>();
interactorLifecycleSubject.onNext(InteractorEvent.ACTIVE);
step.onStep(
new BiFunction<Object, ActionableItem, Step<Object, ActionableItem>>() {
@Override
public Step<Object, ActionableItem> apply(Object o, ActionableItem actionableItem) {
return Step.from(
Observable.just(new Step.Data<>(secondReturnValue, actionableItem))
.singleOrError());
}
})
.asObservable()
.subscribe(testSubscriber);
returnValueSubject.onNext(
Optional.of(
new Step.Data<Object, ActionableItem>(
returnValue,
new ActionableItem() {
@NonNull
@Override
public Observable<InteractorEvent> lifecycle() {
return interactorLifecycleSubject;
}
})));
returnValueSubject.onComplete();
testSubscriber.assertValueCount(1);
assertThat(testSubscriber.values().get(0).get().getValue()).isEqualTo(secondReturnValue);
testSubscriber.assertComplete();
testSubscriber.assertNoErrors();
}
项目:RIBs
文件:StepTest.java
@Test
public void onStep_withAnUnsuccessfulFirstAction_shouldTerminateTheWholeChain() {
TestObserver<Optional<Step.Data<Object, ActionableItem>>> testSubscriber = new TestObserver<>();
final Object secondReturnValue = new Object();
interactorLifecycleSubject.onNext(InteractorEvent.ACTIVE);
step.onStep(
new BiFunction<Object, ActionableItem, Step<Object, ActionableItem>>() {
@Override
public Step<Object, ActionableItem> apply(Object o, ActionableItem actionableItem) {
return Step.from(
Observable.just(new Step.Data<>(secondReturnValue, actionableItem))
.singleOrError());
}
})
.asObservable()
.subscribe(testSubscriber);
returnValueSubject.onNext(Optional.<Step.Data<Object, ActionableItem>>absent());
returnValueSubject.onComplete();
testSubscriber.assertValueCount(1);
assertThat(testSubscriber.values().get(0).isPresent()).isFalse();
testSubscriber.assertComplete();
testSubscriber.assertNoErrors();
}
项目:RxJava2-Android-Sample
文件:NetworkingActivity.java
private void findUsersWhoLovesBoth() {
// here we are using zip operator to combine both request
Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
List<User> userWhoLovesBoth =
filterUserWhoLovesBoth(cricketFans, footballFans);
return userWhoLovesBoth;
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<User>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<User> users) {
// do anything with user who loves both
Log.d(TAG, "userList size : " + users.size());
for (User user : users) {
Log.d(TAG, "user : " + user.toString());
}
}
@Override
public void onError(Throwable e) {
Utils.logError(TAG, e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
项目:RxJava2-Android-Sample
文件:ScanExampleActivity.java
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer int1, Integer int2) throws Exception {
return int1 + int2;
}
})
.subscribe(getObserver());
}
项目:RxJava2-Android-Sample
文件:ScanExampleActivity.java
private void scanWithInitValue(){
getObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.scan(10, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(getObserver());
}
项目:RxJava2-Android-Sample
文件:ReduceExampleActivity.java
/**
* Reduce操作符应用一个函数接收Observable发射的数据和函数的计算结果作为下次计算的参数,
* 输出最后的结果。跟前面我们了解过的scan操作符很类似,只是scan会输出每次计算的结果,而reduce只会输出最后的结果。
*/
private void doSomeWork() {
getObservable()
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
})
.subscribe(getObserver());
}
项目:RxJava2-Android-Sample
文件:CombineLatestExampleActivity.java
private void doSomeWork() {
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.combineLatest(aObservable, bObservable, new BiFunction<String, String, String>() {
@Override
public String apply(@NonNull String s, @NonNull String s2) throws Exception {
return s + "-" + s2;
}
}).subscribe(getObserver());
}
项目:RxJava2-Android-Sample
文件:ZipExampleActivity.java
private void doSomeWork() {
Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
return Utils.filterUserWhoLovesBoth(cricketFans, footballFans);
}
})
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
项目:Rx-Android-Samples
文件:ZipOperatorFragment.java
@Override
public void onViewCreated(View view, @Nullable Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
mRepoListView = view.findViewById(R.id.repo_list_view);
mObserverLog = view.findViewById(R.id.observer_log);
mObserverLog.setMovementMethod(new ScrollingMovementMethod());
//Taking two Observable streams for two users on Github.
Observable.zip(getReposForUser("google"), getReposForUser("fakher-hakim"),
new BiFunction<RepositoryResponse, RepositoryResponse, List<RepositoryResponse>>() {
@Override
public List<RepositoryResponse> apply(@NonNull RepositoryResponse googleRepositoryResponse,
@NonNull RepositoryResponse fakherRepositoryResponse) throws Exception {
//The result of zipping the two streams.
List<RepositoryResponse> zipResult = new ArrayList<>();
//If both repositories have the same language we add them to the result stream.
if (googleRepositoryResponse.language
.equals(fakherRepositoryResponse.language)) {
zipResult.add(fakherRepositoryResponse);
zipResult.add(googleRepositoryResponse);
}
//Return the new stream with result data.
return zipResult;
}
})
//Subscribe the Network call in io Thread.
.subscribeOn(Schedulers.io())
//Subscribe the Observer in MainThread so it can updates the UI with the result.
.observeOn(AndroidSchedulers.mainThread())
//Choose the subscribed Observer for items emitted by this observable.
.subscribe(mListBaseObserver);
}
项目:fluid
文件:ReviewGlobalRating.java
/**
* Function computing an average of {@link Integer}.
* It returns a {@link Tuple} structured as follows: 0: element-count, 1: sum of the element, 2: average
* @return a tuple containing in this order the number of element in the series, the sum of the element and the
* average.
*/
public static BiFunction<Tuple, Integer, Tuple> average() {
return (tuple, rating) -> {
long count = tuple.nth(0);
double sum = tuple.nth(1);
count = count + 1;
sum = sum + rating;
double avg = sum / count;
return Tuple.tuple(count, sum, avg);
};
}
项目:RxRedux
文件:UserListVM.java
private Flowable<List<User>> search(String query) {
return dataUseCase.<User>queryDisk(realm -> realm.where(User.class).beginsWith(User.LOGIN, query))
.zipWith(dataUseCase.<User>getObject(new GetRequest.Builder(User.class, false)
.url(String.format(USER, query)).build())
.onErrorReturnItem(new User()).filter(user -> user.getId() != 0)
.map(user -> user != null ? Collections.singletonList(user)
: Collections.emptyList()),
(BiFunction<List<User>, List<User>, List<User>>) (users, singleton) -> {
users.addAll(singleton);
return new ArrayList<>(new HashSet<>(users));
});
}
项目:MarbleTest4J
文件:DemoTest.java
@Test
public void should_sum() {
// given
Observable<Integer> input = cold("a-b-c-d", of("a", 1, "b", 2, "c", 3, "d", 4));
// when
Observable<Integer> output = input.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer first, Integer second) {
return first + second;
}
});
// then
expectObservable(output).toBe("A-B-C-D", of("A", 1, "B", 3, "C", 6, "D", 10));
}