Java 类io.reactivex.Single 实例源码
项目:RxShell
文件:RootKiller.java
static Single<List<String>> makeMiniHarvester(InputStream inputStream) {
return Observable
.create((ObservableOnSubscribe<String>) emitter -> {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
LineReader lineReader = new LineReader();
String line;
try {
while ((line = lineReader.readLine(reader)) != null && !emitter.isDisposed()) {
emitter.onNext(line);
}
} catch (IOException e) {
if (RXSDebug.isDebug()) Timber.tag(TAG).d("MiniHarvester read error: %s", e.getMessage());
} finally {
emitter.onComplete();
}
})
.doOnEach(n -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("miniHarvesters:doOnEach %s", n); })
.subscribeOn(Schedulers.io())
.toList()
.onErrorReturnItem(new ArrayList<>())
.cache();
}
项目:FireBaseTest
文件:ContentPresenterTest.java
@Test
public void loadingErrorAtRepository_UpdateViewModelToDisplayError() {
when(contentRepository.getContentItem(anyInt())).thenReturn(Single.error(new RuntimeException()));
assertTrue(viewModel.isShowLoadingIndicator());
assertFalse(viewModel.isShowErrorMessage());
contentPresenter.loadContent();
testScheduler.triggerActions();
assertFalse(viewModel.isShowLoadingIndicator());
assertTrue(viewModel.isShowErrorMessage());
assertNull(viewModel.getContentDescription());
assertNull(viewModel.getImageUrl());
}
项目:Phoenix-for-VK
文件:WallsImpl.java
@Override
public Single<Post> getById(int accountId, int ownerId, int postId) {
final IdPair id = new IdPair(postId, ownerId);
return networker.vkDefault(accountId)
.wall()
.getById(Collections.singleton(id), true, 5, Constants.MAIN_OWNER_FIELDS)
.flatMap(response -> {
if(isEmpty(response.posts)){
throw new NotFoundException();
}
List<Owner> owners = Dto2Model.transformOwners(response.profiles, response.groups);
List<VKApiPost> dtos = response.posts;
VKApiPost dto = dtos.get(0);
VKOwnIds ids = new VKOwnIds().append(dto);
return ownersInteractor.findBaseOwnersDataAsBundle(accountId, ids.getAll(), IOwnersInteractor.MODE_ANY, owners)
.map(bundle -> Dto2Model.transform(dto, bundle));
});
}
项目:Phoenix-for-VK
文件:KeysPersistStore.java
@Override
public Single<Optional<AesKeyPair>> findLastKeyPair(int accountId, int peerId) {
return Single.create(e -> {
Uri uri = MessengerContentProvider.getKeysContentUriFor(accountId);
Cursor cursor = getContext().getContentResolver()
.query(uri, null, KeyColumns.PEER_ID + " = ?",
new String[]{String.valueOf(peerId)}, KeyColumns._ID + " DESC LIMIT 1");
AesKeyPair pair = null;
if(nonNull(cursor)){
if(cursor.moveToNext()){
pair = map(cursor).setAccountId(accountId);
}
cursor.close();
}
e.onSuccess(Optional.wrap(pair));
});
}
项目:Phoenix-for-VK
文件:MessagesStore.java
@Override
public Single<Integer> calculateUnreadCount(int accountId, int peerId) {
return Single.fromCallable(() -> {
int result = 0;
Cursor cursor = DBHelper.getInstance(getContext(), accountId)
.getReadableDatabase()
.rawQuery("SELECT COUNT(" + MessageColumns._ID + ") FROM " + MessageColumns.TABLENAME +
" WHERE " + MessageColumns.PEER_ID + " = ?" +
" AND " + MessageColumns.READ_STATE + " = ?" +
" AND " + MessageColumns.OUT + " = ?" +
" AND " + MessageColumns.ATTACH_TO + " = ?" +
" AND " + MessageColumns.DELETED + " = ?",
new String[]{String.valueOf(peerId), "0", "0", "0", "0"});
if (cursor.moveToNext()) {
result = cursor.getInt(0);
}
cursor.close();
return result;
});
}
项目:Phoenix-for-VK
文件:RelationshipInteractor.java
@Override
public Single<Integer> deleteFriends(int accountId, int userId) {
return networker.vkDefault(accountId)
.friends()
.delete(userId)
.map(response -> {
if(response.friend_deleted){
return DeletedCodes.FRIEND_DELETED;
}
if(response.in_request_deleted){
return DeletedCodes.IN_REQUEST_DELETED;
}
if(response.out_request_deleted){
return DeletedCodes.OUT_REQUEST_DELETED;
}
if(response.suggestion_deleted){
return DeletedCodes.SUGGESTION_DELETED;
}
throw new UnepectedResultException();
});
}
项目:Phoenix-for-VK
文件:DatabaseInteractor.java
@Override
public Single<List<Chair>> getChairs(int accoutnId, int facultyId, int count, int offset) {
return networker.vkDefault(accoutnId)
.database()
.getChairs(facultyId, offset, count)
.map(items -> {
List<ChairDto> dtos = Utils.listEmptyIfNull(items.getItems());
List<Chair> chairs = new ArrayList<>(dtos.size());
for(ChairDto dto : dtos){
chairs.add(new Chair(dto.id, dto.title));
}
return chairs;
});
}
项目:RxLifeCycle
文件:UntilLifecycleTransformerSingleTest.java
@Test
public void oneEvent() {
TestObserver<String> testObserver = Single.just("1")
.delay(1, TimeUnit.MILLISECONDS, testScheduler)
.compose(RxLifecycle.<String, String>bind(lifecycle))
.test();
testObserver.assertNoValues();
testObserver.assertNoErrors();
lifecycle.onNext("stop");
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
testObserver.assertError(CancellationException.class);
}
项目:Phoenix-for-VK
文件:KeysRamStore.java
@Override
public Single<List<AesKeyPair>> getKeys(int accountId, int peerId) {
return Single.create(e -> {
List<AesKeyPair> list = mData.get(accountId);
List<AesKeyPair> result = new ArrayList<>(Objects.isNull(list) ? 0 : 1);
if (Objects.nonNull(list)) {
for (AesKeyPair pair : list) {
if (pair.getPeerId() == peerId) {
result.add(pair);
}
}
}
e.onSuccess(result);
});
}
项目:Android-Allocine-Api
文件:AllocineApi.java
/**
* Recherche
*/
public Single<AllocineResponseSmall> searchSmall(final String recherche, final List<String> filter, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.Q, "" + recherche.replace(" ", "+"),
AllocineService.FILTER, filter,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<AllocineResponseSmall>>() {
@Override
public SingleSource<AllocineResponseSmall> apply(Pair<String, String> pair) throws Exception {
return allocineService.searchSmall(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second);
}
})
.compose(this.<AllocineResponseSmall>retry());
}
项目:Phoenix-for-VK
文件:WallsImpl.java
@Override
public Single<List<Post>> getCachedWall(int accountId, int ownerId, int wallFilter) {
WallCriteria criteria = new WallCriteria(accountId, ownerId).setMode(wallFilter);
return repositories.wall()
.findDbosByCriteria(criteria)
.compose(dbos2models(accountId));
}
项目:async-sqs
文件:DeleteMessageTaskTest.java
@Test
public void testTaskError() {
when(requestSenderMock.sendRequest(any())).thenReturn(Single.error(EXCEPTION));
task.run(QUEUE_URL, ENTRY_MAP);
ENTRY_MAP.values().forEach((entry) -> entry.getResultSubject().test().assertError(EXCEPTION));
verify(requestSenderMock).sendRequest(any());
}
项目:Phoenix-for-VK
文件:CommentsInteractor.java
@Override
public Single<Comment> send(int accountId, Commented commented, final CommentIntent intent) {
final Single<List<IAttachmentToken>> cachedAttachments;
if (nonNull(intent.getDraftMessageId())) {
cachedAttachments = getCachedAttachmentsToken(accountId, intent.getDraftMessageId());
} else {
cachedAttachments = Single.just(emptyList());
}
return cachedAttachments
.flatMap(cachedTokens -> {
final List<IAttachmentToken> tokens = new ArrayList<>();
if (nonNull(cachedTokens)) {
tokens.addAll(cachedTokens);
}
if (nonEmpty(intent.getModels())) {
tokens.addAll(Model2Dto.createTokens(intent.getModels()));
}
return sendComment(accountId, commented, intent, tokens)
.flatMap(id -> getCommentByIdAndStore(accountId, commented, id, true))
.flatMap(comment -> {
if (isNull(intent.getDraftMessageId())) {
return Single.just(comment);
}
return cache.comments()
.deleteByDbid(accountId, intent.getDraftMessageId())
.andThen(Single.just(comment));
});
});
}
项目:Phoenix-for-VK
文件:IFriendsService.java
@FormUrlEncoded
@POST("friends.search")
Single<BaseResponse<Items<VKApiUser>>> search(@Field("user_id") int userId,
@Field("q") String query,
@Field("fields") String fields,
@Field("name_case") String nameCase,
@Field("offset") Integer offset,
@Field("count") Integer count);
项目:Phoenix-for-VK
文件:DocsInteractor.java
@Override
public Single<Document> findById(int accountId, int ownerId, int docId) {
return networker.vkDefault(accountId)
.docs()
.getById(Collections.singletonList(new IdPair(docId, ownerId)))
.map(dtos -> {
if(dtos.isEmpty()){
throw new NotFoundException();
}
return Dto2Model.transform(dtos.get(0));
});
}
项目:FCM-for-Mojo
文件:DiscussWhitelistActivity.java
@Override
public Single<? extends WhitelistState> startFetchWhitelistState() {
return Single.zip(FFMService.getDiscussWhitelist(), OpenQQService.getDiscussesInfo(),
new BiFunction<DiscussWhitelistState, List<Discuss>, DiscussWhitelistState>() {
@Override
public DiscussWhitelistState apply(DiscussWhitelistState state, List<Discuss> groups) throws Exception {
state.generateStates(groups);
return state;
}
});
}
项目:Phoenix-for-VK
文件:LogsStore.java
@Override
public Single<List<LogEvent>> getAll(int type) {
return Single.fromCallable(() -> {
Cursor cursor = helper().getReadableDatabase().query(LogColumns.TABLENAME, PROJECTION, LogColumns.TYPE + " = ?",
new String[]{String.valueOf(type)}, null, null, LogColumns._ID + " DESC");
List<LogEvent> data = new ArrayList<>(cursor.getCount());
while (cursor.moveToNext()){
data.add(map(cursor));
}
cursor.close();
return data;
});
}
项目:wayf-cloud
文件:UserRouting.java
public Single<User> getCurrentUser(RoutingContext routingContext) {
String authorizationHeader = RequestReader.getHeaderValue(routingContext, RequestReader.AUTHORIZATION_HEADER);
if (authorizationHeader == null) {
throw new ServiceException(HttpStatus.SC_UNAUTHORIZED, "No authorization token provided");
}
AuthorizationToken token = authorizationTokenFactory.fromAuthorizationHeader(authorizationHeader);
return Single.just((User) authenticationFacade.authenticate(token).getAuthenticatable())
.flatMap((user) -> userFacade.read(user.getId()));
}
项目:black-mirror
文件:LocationDataSource.java
/**
* Zwraca strefę czasową na podstawie podanej lokalizacji.
@param location Lokalizacja - miasto, kraj, wieś.
*/
@Override
public Single<TimeZone> getTimeZoneByLocationName(String location) {
return googleGeoApi.getCoordForLocation(location, GOOGLE_GEO_API_KEY)
.flatMap(new Function<CoordResponse, SingleSource<? extends TimeZone>>() {
@Override
public SingleSource<? extends TimeZone> apply(@NonNull CoordResponse coordResponse) throws Exception {
String lat = coordResponse.results.get(0).geometry.location.lat.toString();
String lng = coordResponse.results.get(0).geometry.location.lng.toString();
return timeZoneDbApi.getTimeZone(lat, lng, TIME_ZONE_DB_API_KEY);
}
});
}
项目:Hubs
文件:HubManager.java
@Override
@NonNull
public Single<Hub> readConfig(@NonNull String hubId) {
return Single.create(emitter -> {
try {
emitter.onSuccess(
Hub.fromLua(readConfigToString(hubId)));
} catch (Exception e) {
emitter.tryOnError(e);
}
});
}
项目:GreenfieldTemplate
文件:MockBBCService.java
@Override
public Single<PlaylistResponse> fetchSongs() {
EspressoIdlingResource.getInstance().increment();
return Single.just(mockResponse)
.delay(delay, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(() -> EspressoIdlingResource.getInstance().decrement());
}
项目:async-sqs
文件:DeleteMessageTaskTest.java
@Test
public void testTaskIndividualError() {
when(requestSenderMock.sendRequest(any())).thenReturn(Single.just(ERROR_RESULT));
task.run(QUEUE_URL, ENTRY_MAP);
ENTRY_MAP.values().forEach((entry) -> entry.getResultSubject().test().assertError(Exception.class));
verify(requestSenderMock).sendRequest(any());
}
项目:Phoenix-for-VK
文件:PhotosApi.java
@Override
public Single<Boolean> restore(Integer ownerId, int photoId) {
return provideService(IPhotosService.class, TokenType.USER)
.flatMap(service -> service.restore(ownerId, photoId)
.map(extractResponseWithErrorHandling())
.map(response -> response == 1));
}
项目:REDAndroid
文件:UserSearchSearchPresenterTest.java
@Test
public void loadAnnouncementsEmpty() {
Announcement emptyList = new Announcement();
emptyList.response = new Announcement.Response();
emptyList.status = "";
emptyList.response.announcements = new ArrayList<>();
stubDataManagerGetAnnouncements(Single.just(emptyList));
mAnnouncementPresenter.loadAnnouncements();
verify(mMockAnnouncementMvpView).showProgress(true);
verify(mMockAnnouncementMvpView).showProgress(false);
verify(mMockAnnouncementMvpView).showAnnouncementsEmpty();
}
项目:reactive-grpc
文件:ServerCalls.java
/**
* Implements a unary -> stream call as {@link Single} -> {@link Flowable}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> void oneToMany(
TRequest request, StreamObserver<TResponse> responseObserver,
Function<Single<TRequest>, Flowable<TResponse>> delegate) {
try {
Single<TRequest> rxRequest = Single.just(request);
Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
rxResponse.subscribe(new ReactivePublisherBackpressureOnReadyHandler<TResponse>(
(ServerCallStreamObserver<TResponse>) responseObserver));
} catch (Throwable throwable) {
responseObserver.onError(prepareError(throwable));
}
}
项目:black-mirror
文件:NewsDataSource.java
/**
* @return Zwraca model wiadomości dla kanału TVN24.
*/
@Override
public Single<List<News>> getTvnNews() {
return tvn24Rss.getNews()
.map(new Function<Tvn24News, List<News>>() {
@Override
public List<News> apply(Tvn24News tvn24News) throws Exception {
return tvn24News.channel.news;
}
});
}
项目:Phoenix-for-VK
文件:IVideoService.java
@FormUrlEncoded
@POST("video.get")
Single<BaseResponse<Items<VKApiVideo>>> get(@Field("owner_id") Integer ownerId,
@Field("videos") String videos,
@Field("album_id") Integer albumId,
@Field("count") Integer count,
@Field("offset") Integer offset,
@Field("extended") Integer extended);
项目:dztools
文件:StopLoss.java
public Single<Double> forSubmit(final Instrument instrument,
final BrokerBuyData brokerBuyData) {
return Single
.fromCallable(brokerBuyData::slDistance)
.flatMap(slDistance -> isDistanceForNoSL(slDistance)
? Single.just(StrategyUtil.platformSettings.noSLPrice())
: forSubmitWithRealDistance(instrument, brokerBuyData));
}
项目:Phoenix-for-VK
文件:AudioApi.java
@Override
public Single<int[]> setBroadcast(IdPair audio, Collection<Integer> targetIds) {
String audioStr = Objects.isNull(audio) ? null : audio.ownerId + "_" + audio.id;
return provideService(IAudioService.class)
.flatMap(service -> service
.setBroadcast(audioStr, join(targetIds, ","))
.map(extractResponseWithErrorHandling()));
}
项目:pact-workshop-android
文件:PresenterTest.java
@Test
public void should_show_loaded_when_fetch_succeeds() {
// given
when(repository.fetchResponse(any())).thenReturn(Single.just(FakeService.RESPONSE));
// when
presenter.onStart();
// then
verify(view).setViewState(ViewState.Loaded.create(FakeService.RESPONSE.getAnimals()));
}
项目:Phoenix-for-VK
文件:WallApi.java
@Override
public Single<Boolean> delete(Integer ownerId, int postId) {
return provideService(IWallService.class, TokenType.USER)
.flatMap(service -> service.delete(ownerId, postId)
.map(extractResponseWithErrorHandling())
.map(response -> response == 1));
}
项目:RxLifeCycle
文件:UntilLifecycleTransformerSingleTest.java
@Test
public void noEvent() {
TestObserver<String> testObserver = Single.just("1")
.delay(1, TimeUnit.MILLISECONDS, testScheduler)
.compose(RxLifecycle.<String, String>bind(lifecycle))
.test();
testObserver.assertNoValues();
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
testObserver.assertValue("1");
testObserver.assertComplete();
}
项目:Phoenix-for-VK
文件:VkRetrofitProvider.java
@Override
public Single<RetrofitWrapper> provideCustomRetrofit(int accountId, String token) {
return Single.fromCallable(() -> {
OkHttpClient client = clientFactory.createCustomVkHttpClient(accountId, token, VKGSON, proxyManager.getActiveProxy());
return createDefaultVkApiRetrofit(client);
});
}
项目:dztools
文件:BrokerBuy.java
public Single<Integer> openTrade(final BrokerBuyData brokerBuyData) {
return Single
.defer(() -> tradeUtility.instrumentForTrading(brokerBuyData.assetName()))
.flatMap(instrument -> submitParamsRunner.get(instrument, brokerBuyData))
.flatMap(order -> processOrderAndGetResult(order, brokerBuyData))
.onErrorReturnItem(ZorroReturnValues.BROKER_BUY_FAIL.getValue());
}
项目:AndroidSensors
文件:GPSSensorGatheringTest.java
private Single<Long> gatherDuring10Sec(GathererCreator gathererCreator) {
return gathererCreator.create().recordStream()
.subscribeOn(Schedulers.newThread())
.take(10, TimeUnit.SECONDS)
.toObservable()
.count();
}
项目:NovelReader
文件:ReviewDetailPresenter.java
@Override
public void refreshReviewDetail(String detailId, int start, int limit) {
Single<ReviewDetailBean> detailSingle = RemoteRepository
.getInstance().getReviewDetail(detailId);
Single<List<CommentBean>> bestCommentsSingle = RemoteRepository
.getInstance().getBestComments(detailId);
Single<List<CommentBean>> commentsSingle = RemoteRepository
.getInstance().getDetailBookComments(detailId, start, limit);
Disposable detailDispo = RxUtils.toCommentDetail(detailSingle, bestCommentsSingle, commentsSingle)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
(bean) -> {
mView.finishRefresh(bean.getDetail(),
bean.getBestComments(), bean.getComments());
mView.complete();
},
(e) -> {
mView.showError();
LogUtils.e(e);
}
);
addDisposable(detailDispo);
}
项目:Phoenix-for-VK
文件:IGroupsApi.java
@CheckResult
Single<Boolean> join(int groupId, Integer notSure);
项目:dztools
文件:TickTimeProviderTest.java
private OngoingStubbing<Single<Long>> stubGetLatestFromRepository() {
return when(tickTimeRepositoryMock.get());
}
项目:vertx-musicstore
文件:ArtistHandler.java
private Single<JsonObject> findArtist(SQLConnection sqlConnection, Long artistId) {
return sqlConnection.rxQueryStreamWithParams(findArtistById, new JsonArray().add(artistId))
.flatMapObservable(SQLRowStream::toObservable)
.map(row -> new JsonObject().put("id", artistId).put("name", row.getString(0)))
.singleOrError();
}
项目:android-mvp-architecture
文件:AppDataManager.java
@Override
public Single<LoginResponse> doServerLoginApiCall(LoginRequest.ServerLoginRequest
request) {
return mApiHelper.doServerLoginApiCall(request);
}