/** * 添加线程管理并订阅 * @param ob * @param subscriber * @param cacheKey 缓存kay * @param event Activity 生命周期 * @param lifecycleSubject * @param isSave 是否缓存 * @param forceRefresh 是否强制刷新 */ public void toSubscribe(Observable ob, final ProgressSubscriber subscriber, String cacheKey, final ActivityLifeCycleEvent event, final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject, boolean isSave, boolean forceRefresh) { //数据预处理 Observable.Transformer<HttpResult<Object>, Object> result = RxHelper.handleResult(event,lifecycleSubject); Observable observable = ob.compose(result) .doOnSubscribe(new Action0() { @Override public void call() { //显示Dialog和一些其他操作 subscriber.showProgressDialog(); } }); RetrofitCache.load(cacheKey,observable,isSave,forceRefresh).subscribe(subscriber); }
/** * 利用Observable.takeUntil()停止网络请求 * * @param event * @param lifecycleSubject * @param <T> * @return */ @NonNull public <T> Observable.Transformer<T, T> bindUntilEvent(@NonNull final ActivityLifeCycleEvent event, final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject) { return new Observable.Transformer<T, T>() { @Override public Observable<T> call(Observable<T> sourceObservable) { Observable<ActivityLifeCycleEvent> compareLifecycleObservable = lifecycleSubject.takeFirst(new Func1<ActivityLifeCycleEvent, Boolean>() { @Override public Boolean call(ActivityLifeCycleEvent activityLifeCycleEvent) { return activityLifeCycleEvent.equals(event); } }); return sourceObservable.takeUntil(compareLifecycleObservable); } }; }
@Test public void testErrorsDoNotEmit() { final PublishSubject<Integer> source = PublishSubject.create(); final Observable<Integer> result = source .materialize() .compose(Transformers.values()); final TestSubscriber<Integer> resultTest = new TestSubscriber<>(); result.subscribe(resultTest); // source.onNext(1); resultTest.assertValues(1); // An error in the source stream should not emit values. source.onError(new Throwable()); resultTest.assertValues(1); }
@Test public void basic() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); subject.onNext(0); o.takeNext(); subject.onNext(1); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); subject.onCompleted(); o.assertOnCompleted(); }
@Test public void completion() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); subject.onNext(1); subject.onCompleted(); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); o.assertOnCompleted(); }
@Test public void unsubscription() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); Subscription sub = subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); subject.onNext(1); o.assertNoMoreEvents(); sub.unsubscribe(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.assertNoMoreEvents(); }
@Test public void overDelay_shouldEmitImmediately() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); scheduler.advanceTimeBy(2, TimeUnit.SECONDS); subject.onNext(1); o.takeNext(); }
@Test public void should_fail_on_second_subscription() { // given PublishSubject<Integer> subject = PublishSubject.create(); Observable<Integer> limitedObservable = SubscriptionLimiter.limitSubscriptions(1, subject); TestSubscriber<Integer> subscriber = new TestSubscriber<>(); TestSubscriber<Integer> subscriber2 = new TestSubscriber<>(); limitedObservable.subscribe(subscriber); // when limitedObservable.subscribe(subscriber2); subject.onNext(123); // then assertThat(subscriber2.getOnNextEvents()).isEmpty(); assertThat(subscriber2.getOnErrorEvents()).hasSize(1); }
@Test public void refresh() throws Exception { final Reddit reddit = new Reddit(); PublishSubject<Reddit> subject = PublishSubject.create(); Mockito.doReturn(subject.asObservable().toList()) .when(mRepository) .getReddits(Mockito.anyString()); mViewModel.refresh(); Mockito.verify(mRepository).getReddits("test"); Assert.assertThat(mViewModel.errorText.get(), IsNull.nullValue()); Assert.assertThat(mViewModel.isLoading.get(), Is.is(true)); subject.onNext(reddit); subject.onCompleted(); Assert.assertThat(mViewModel.isLoading.get(), Is.is(false)); Assert.assertThat(mViewModel.reddits, IsCollectionContaining.hasItems(reddit)); }
@Test public void searchQueryChange() throws Exception { final Subreddit subreddit = new Subreddit(); PublishSubject<Subreddit> subject = PublishSubject.create(); Mockito.doReturn(subject.asObservable().toList()) .when(mRepository) .searchSubreddits(Mockito.anyString()); mViewModel.subscribeOnSearchQueryChange(); mViewModel.mSearchQuery.onNext("test"); Mockito.verify(mRepository).searchSubreddits("test"); Assert.assertThat(mViewModel.isLoading.get(), Is.is(true)); subject.onNext(subreddit); subject.onCompleted(); Assert.assertThat(mViewModel.isLoading.get(), Is.is(false)); Assert.assertThat(mViewModel.subreddits, IsCollectionContaining.hasItems(subreddit)); }
@Override public Observable<GattObserveData> observeIndication( final BluetoothGattCharacteristic characteristicToIndication) { return Observable.merge(indicationSubject = PublishSubject.create(), Observable.create((Observable.OnSubscribe<GattObserveData>) subscriber -> { if (isConnected()) { if (characteristicToIndication != null) { bluetoothGatt.setCharacteristicNotification(characteristicToIndication, true); BluetoothGattDescriptor indicationDescriptor = characteristicToIndication.getDescriptor(BluetoothGatts.CLIENT_CHARACTERISTIC_CONFIG); indicationDescriptor.setValue(BluetoothGattDescriptor.ENABLE_INDICATION_VALUE); bluetoothGatt.writeDescriptor(indicationDescriptor); } else { subscriber.onError(UUID_NOT_FOUND); } } else { subscriber.onError(GATT_NOT_CONNECTED); } })).doOnSubscribe(() -> currentIndicationCharacteristic = characteristicToIndication); }
@Test public void testPagingCapabilities() { PublishSubject<Object> view = PublishSubject.create(); BehaviorSubject<Integer> nextPageRequests = BehaviorSubject.create(); final TestObserver<Delivery<Object, String>> testObserver = new TestObserver<>(); nextPageRequests .concatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer targetPage) { return targetPage <= requestedPageCount ? Observable.<Integer>never() : Observable.range(requestedPageCount, targetPage - requestedPageCount); } }) .doOnNext(new Action1<Integer>() { @Override public void call(Integer it) { requestedPageCount = it + 1; } }) .startWith(Observable.range(0, requestedPageCount)) .concatMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(final Integer page) { return requestPage(page, PAGE_SIZE); } }) .compose(new DeliverReplay<Object, String>(view)) .subscribe(testObserver); ArrayList<Delivery<Object, String>> onNext = new ArrayList<>(); testObserver.assertReceivedOnNext(onNext); view.onNext(999); addOnNext(onNext, 999, 0, 1, 2); testObserver.assertReceivedOnNext(onNext); nextPageRequests.onNext(2); addOnNext(onNext, 999, 3, 4, 5); testObserver.assertReceivedOnNext(onNext); view.onNext(null); assertEquals(0, testObserver.getOnCompletedEvents().size()); testObserver.assertReceivedOnNext(onNext); nextPageRequests.onNext(3); assertEquals(0, testObserver.getOnCompletedEvents().size()); testObserver.assertReceivedOnNext(onNext); view.onNext(9999); addOnNext(onNext, 9999, 0, 1, 2, 3, 4, 5, 6, 7, 8); assertEquals(0, testObserver.getOnCompletedEvents().size()); testObserver.assertReceivedOnNext(onNext); }
@Test public void testActivityObservableBindLifecycle() throws Exception { final Observable<Object> observable = PublishSubject.create().asObservable(); TestSubscriber<Object> testSubscriber = TestSubscriber.create(); ActivityController controller = Robolectric.buildActivity(TestActivity.class).create().start(); NaviActivity activity = (NaviActivity) controller.get(); this.presenter.bind(activity); this.presenter.bindLifecycle(observable, testSubscriber); controller.create(); assertFalse(testSubscriber.isUnsubscribed()); controller.start(); assertFalse(testSubscriber.isUnsubscribed()); controller.resume(); assertFalse(testSubscriber.isUnsubscribed()); controller.pause(); assertFalse(testSubscriber.isUnsubscribed()); controller.stop(); assertFalse(testSubscriber.isUnsubscribed()); controller.destroy(); testSubscriber.assertCompleted(); testSubscriber.assertUnsubscribed(); }
protected void initObservable() { subject = PublishSubject.create(); subscribe = subject.onBackpressureBuffer() .observeOn(EventThread.getScheduler(observeThread)) .subscribeOn(EventThread.getScheduler(subscribeThread)) .subscribe(event -> { try { if (valid) { handleEvent(event); } } catch (InvocationTargetException e) { throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberEvent.this, e); } }); }
/** * @param <T> * @return */ public static <T> Observable.Transformer<HttpResult<T>, T> handleResult(final ActivityLifeCycleEvent event, final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject) { return new Observable.Transformer<HttpResult<T>, T>() { @Override public Observable<T> call(Observable<HttpResult<T>> tObservable) { Observable<ActivityLifeCycleEvent> compareLifecycleObservable = lifecycleSubject.takeFirst(new Func1<ActivityLifeCycleEvent, Boolean>() { @Override public Boolean call(ActivityLifeCycleEvent activityLifeCycleEvent) { return activityLifeCycleEvent.equals(event); } }); return tObservable.flatMap(new Func1<HttpResult<T>, Observable<T>>() { @Override public Observable<T> call(HttpResult<T> result) { if (result.getCount() != 0) { return createData(result.getSubjects()); } else { return Observable.error(new ApiException(result.getCount())); } } }).takeUntil(compareLifecycleObservable).subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()); } }; }
/** * 用SerializedSubject包装PublishSubject,序列化 */ private RxBus() { //private final PublishSubject<Object> _bus = PublishSubject.create(); // If multiple threads are going to emit events to this // then it must be made thread-safe like this instead _bus = new SerializedSubject<>(PublishSubject.create()); }
public PublishSubject<Boolean> begin() { if(publishSubject == null){ publishSubject = PublishSubject.create(); } if (Build.VERSION.SDK_INT < 23){ publishSubject.onError(new FPerException(SYSTEM_API_ERROR)); }else { initManager(); confirmFinger(); startListening(null); } return publishSubject; }
public void onNext(T1 args) { try { int id; Subject<T2, T2> subj = PublishSubject.create(); Observer<T2> subjSerial = new SerializedObserver(subj); synchronized (ResultManager.this.guard) { ResultManager resultManager = ResultManager.this; id = resultManager.leftIds; resultManager.leftIds = id + 1; ResultManager.this.leftMap.put(Integer.valueOf(id), subjSerial); } Observable<T2> window = Observable.create(new WindowObservableFunc(subj, ResultManager.this.cancel)); Observable<D1> duration = (Observable) OnSubscribeGroupJoin.this.leftDuration.call(args); Subscriber<D1> d1 = new LeftDurationObserver(id); ResultManager.this.group.add(d1); duration.unsafeSubscribe(d1); R result = OnSubscribeGroupJoin.this.resultSelector.call(args, window); synchronized (ResultManager.this.guard) { List<T2> rightMapValues = new ArrayList(ResultManager.this.rightMap.values()); } ResultManager.this.subscriber.onNext(result); for (T2 t2 : rightMapValues) { subjSerial.onNext(t2); } } catch (Throwable t) { Exceptions.throwOrReport(t, this); } }
private MockWalker(Context context) { mContext = context.getApplicationContext(); mChanges = PublishSubject.create(); mClient = new GoogleApiClient.Builder(mContext) .addApi(LocationServices.API) .addConnectionCallbacks(this) .build(); mClient.connect(); }
private void createCounterEmitter() { mCounterEmitter = PublishSubject.create(); mCounterEmitter.subscribe(new Observer<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { mCounterDisplay.setText(String.valueOf(integer)); } }); }
/** * Sign out from both Google and Facebook and disable auto sign in for Smart Lock Password. * Require a FragmentActivity * * @param activity the activity * @return a PublishSubject<RxStatus> */ public PublishSubject<RxStatus> signOut(FragmentActivity activity) { mStatusObserver = PublishSubject.create(); Observable<RxStatus> rxGoogleSignOut = new RxGoogleAuth.Builder(activity).build().signOut(); Observable<RxStatus> rxFacebookSignOut = new RxFacebookAuth.Builder(activity).build().signOut(); Observable<RxStatus> rxSmartLockDisableAutoSignin = new RxSmartLockPasswords.Builder(activity).build().disableAutoSignIn(); Observable.merge(rxGoogleSignOut, rxFacebookSignOut, rxSmartLockDisableAutoSignin) .subscribe(new Action1<RxStatus>() { @Override public void call(RxStatus rxStatus) { mStatusObserver.onNext(rxStatus); mStatusObserver.onCompleted(); } }); return mStatusObserver; }
/** * Facebook sign out */ public void signOut(PublishSubject<RxStatus> statusSubject) { LoginManager.getInstance().logOut(); // delete current user deleteCurrentUser(); statusSubject.onNext(new RxStatus( CommonStatusCodes.SUCCESS, getString(R.string.status_success_log_out_message) )); statusSubject.onCompleted(); }
@NonNull public UploadManager build() { if (uploadService == null) { throw new IllegalArgumentException("Must provide a valid upload service"); } if (uploadDataStore == null) { throw new IllegalArgumentException("Must provide a valid upload data store"); } if (uploadErrorAdapter == null) { throw new IllegalArgumentException("Must provide a valid upload error adapter"); } final Subject<Job, Job> jobSubject = PublishSubject.<Job>create().toSerialized(); final Subject<Status, Status> statusSubject = PublishSubject.<Status>create().toSerialized(); final Uploader uploader = Uploader.create(uploadService); final UploadInteractor uploadInteractor = UploadInteractorImpl.create(uploader, uploadDataStore, uploadErrorAdapter); return new UploadManager(uploadInteractor, uploadErrorAdapter, jobSubject, statusSubject, deleteRecordOnComplete); }
public PublishSubject<Boolean> begin() { if (publishSubject == null) { publishSubject = PublishSubject.create(); } if (Build.VERSION.SDK_INT < 23) { publishSubject.onError(new FPerException(SYSTEM_API_ERROR)); } else { initManager(); if (confirmFinger()) { startListening(null); } } return publishSubject; }
@NonNull public static Observable<Boolean> requestWithRationale(final Dialog rationaleDialog, final Activity activity, final PermissionGroup permissions) { final PublishSubject<Void> rationaleSubject = PublishSubject.create(); rationaleDialog.setOnDismissListener(new DialogInterface.OnDismissListener() { @Override public void onDismiss(DialogInterface dialog) { rationaleSubject.onNext(null); } }); return requestWithRationale( rationaleSubject.doOnSubscribe(new Action0() { @Override public void call() { rationaleDialog.show(); } }), activity, permissions); }
public static boolean onRequestPermissionsResult(int requestCode, String permissions[], int[] grantResults) { final PublishSubject<Boolean> publishSubject = requestMap.get(requestCode); if(publishSubject == null) return false; if (grantResults.length > 0 && grantResults[0] == PackageManager.PERMISSION_GRANTED) { publishSubject.onNext(true); } else { publishSubject.onNext(false); } releaseRequestCode(requestCode); return true; }
@Test public void testErrorResponse() { final Gson gson = new Gson(); final PublishSubject<Response<Integer>> response = PublishSubject.create(); final Observable<Integer> result = response.lift(Operators.apiError(gson)); final TestSubscriber<Integer> resultTest = new TestSubscriber<>(); result.subscribe(resultTest); response.onNext(Response.error(400, ResponseBody.create(null, ""))); resultTest.assertNoValues(); assertEquals(1, resultTest.getOnErrorEvents().size()); }
@Override public Dispatcher create(Store<T> store, Dispatcher nextDispatcher) { PublishSubject<Action> actions = PublishSubject.create(); subscription = epic.run(actions, store).subscribe(store::dispatch); return action -> { nextDispatcher.dispatch(action); if(action instanceof Action) { actions.onNext((Action) action); } }; }
public MediaViewHolder(View itemView, PublishSubject<CardTouchEvent> cardTouchEventPublishSubject, DateCalculator dateCalculator, SpannableFactory spannableFactory) { super(itemView, cardTouchEventPublishSubject); this.dateCalculator = dateCalculator; this.spannableFactory = spannableFactory; this.cardTouchEventPublishSubject = cardTouchEventPublishSubject; publisherAvatar = (ImageView) itemView.findViewById(R.id.card_image); publisherName = (TextView) itemView.findViewById(R.id.card_title); date = (TextView) itemView.findViewById(R.id.card_subtitle); articleTitle = (TextView) itemView.findViewById(R.id.partial_social_timeline_thumbnail_title); articleThumbnail = (ImageView) itemView.findViewById(R.id.featured_graphic); articleHeader = itemView.findViewById(R.id.displayable_social_timeline_article_header); relatedTo = (TextView) itemView.findViewById(R.id.app_name); playIcon = (ImageView) itemView.findViewById(R.id.play_button); likeButton = (LikeButtonView) itemView.findViewById(R.id.social_like_button); likeView = itemView.findViewById(R.id.social_like); commentButton = (TextView) itemView.findViewById(R.id.social_comment); shareButton = (TextView) itemView.findViewById(R.id.social_share); overflowMenu = itemView.findViewById(R.id.overflow_menu); }
public PopularAppViewHolder(View view, PublishSubject<CardTouchEvent> cardTouchEventPublishSubject, DateCalculator dateCalculator) { super(view, cardTouchEventPublishSubject); this.inflater = LayoutInflater.from(itemView.getContext()); this.dateCalculator = dateCalculator; this.headerSubTitle = (TextView) view.findViewById(R.id.displayable_social_timeline_popular_app_card_timestamp); this.appIcon = (ImageView) view.findViewById(R.id.displayable_social_timeline_popular_app_icon); this.appName = (TextView) view.findViewById(R.id.displayable_social_timeline_popular_app_body_title); this.appRating = (RatingBar) view.findViewById(R.id.rating_bar); this.getAppButton = (Button) view.findViewById(R.id.displayable_social_timeline_popular_app_get_app_button); this.headerUsersContainer = (ViewGroup) view.findViewById(R.id.displayable_social_timeline_popular_app_users_container); this.likeButton = (LikeButtonView) itemView.findViewById(R.id.social_like_button); this.like = (LinearLayout) itemView.findViewById(R.id.social_like); this.cardTouchEventPublishSubject = cardTouchEventPublishSubject; this.commentButton = (TextView) view.findViewById(R.id.social_comment); this.shareButton = (TextView) itemView.findViewById(R.id.social_share); this.overflowMenu = itemView.findViewById(R.id.overflow_menu); }
@Inject public RxBindingExampleViewModel() { final long intervalMs = 10; final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("mm:ss:SS"); timeStream = Observable .interval(intervalMs, TimeUnit.MILLISECONDS) .onBackpressureDrop() .map(beats -> Duration.ofMillis(intervalMs * beats)) .map(duration -> formatter.format(LocalTime.MIDNIGHT.plus(duration))); calculateSubject = PublishSubject.create(); highLoadStream = calculateSubject .observeOn(Schedulers.computation()) .scan((sum, value) -> ++sum) .map(iteration -> { // Simulate high processing load try { Thread.sleep(1000); } catch (InterruptedException e) {} return iteration; }); }
@Test public void buffer() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); subject.onNext(1); subject.onNext(2); subject.onNext(3); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); o.assertNoMoreEvents(); subject.onCompleted(); o.assertOnCompleted(); }
/** * Map operations on a RemoteDataSet result in only one onNext * invocation that will return the final IDataSet. */ @Override public <S> Observable<PartialResult<IDataSet<S>>> map(final IMap<T, S> mapper) { final MapOperation<T, S> mapOp = new MapOperation<T, S>(mapper); final byte[] serializedOp = SerializationUtils.serialize(mapOp); final UUID operationId = UUID.randomUUID(); final Command command = Command.newBuilder() .setIdsIndex(this.remoteHandle) .setSerializedOp(ByteString.copyFrom(serializedOp)) .setHighId(operationId.getMostSignificantBits()) .setLowId(operationId.getLeastSignificantBits()) .build(); final PublishSubject<PartialResult<IDataSet<S>>> subj = PublishSubject.create(); final StreamObserver<PartialResponse> responseObserver = new NewDataSetObserver<S>(subj); return subj.doOnSubscribe(() -> this.stub.withDeadlineAfter(TIMEOUT, TimeUnit.MILLISECONDS) .map(command, responseObserver)) .doOnUnsubscribe(() -> this.unsubscribe(operationId)); }
@Override public <S> Observable<PartialResult<IDataSet<S>>> flatMap(IMap<T, List<S>> mapper) { final FlatMapOperation<T, S> mapOp = new FlatMapOperation<T, S>(mapper); final byte[] serializedOp = SerializationUtils.serialize(mapOp); final UUID operationId = UUID.randomUUID(); final Command command = Command.newBuilder() .setIdsIndex(this.remoteHandle) .setSerializedOp(ByteString.copyFrom(serializedOp)) .setHighId(operationId.getMostSignificantBits()) .setLowId(operationId.getLeastSignificantBits()) .build(); final PublishSubject<PartialResult<IDataSet<S>>> subj = PublishSubject.create(); final StreamObserver<PartialResponse> responseObserver = new NewDataSetObserver<S>(subj); return subj.doOnSubscribe(() -> this.stub.withDeadlineAfter(TIMEOUT, TimeUnit.MILLISECONDS) .flatMap(command, responseObserver)) .doOnUnsubscribe(() -> this.unsubscribe(operationId)); }
private void setupSubscriptions() { subject = PublishSubject.create(); subscription = subject.observeOn(Schedulers.io()) .flatMap(new Func1<String, Observable<List<DownloadableItem>>>() { @Override public Observable<List<DownloadableItem>> call(String s) { Timber.d("Constructing downloadables...."); List<DownloadableItem> list = new ArrayList<>(); for (int i = 0; i < URLS.length; i++) { DownloadStatus status = downlink.downloadStatus(URLS[i]); list.add(DownloadableItem.builder() .name("Item " + (i + 1)) .url(URLS[i]) .status(status) .build()); } return Observable.just(list); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<List<DownloadableItem>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Timber.d(e, "Error occurred"); } @Override public void onNext(List<DownloadableItem> downloadableItems) { Timber.d("Setting data in adapter..."); downloadsAdapter.setData(downloadableItems); } }); }
@Test public void should_allow_a_subscription_after_an_unsubscription() { // given PublishSubject<Integer> subject = PublishSubject.create(); Observable<Integer> limitedObservable = SubscriptionLimiter.limitSubscriptions(1, subject); TestSubscriber<Integer> subscriber = new TestSubscriber<>(); TestSubscriber<Integer> subscriber2 = new TestSubscriber<>(); Subscription subscription = limitedObservable.subscribe(subscriber); // when subscription.unsubscribe(); limitedObservable.subscribe(subscriber2); subject.onNext(123); // then assertThat(subscriber2.getOnNextEvents()).hasSize(1).contains(123); }