public void runCode() { Observable .interval(100, TimeUnit.MILLISECONDS) .take(3) .timestamp() .subscribe(new Observer<Timestamped>() { @Override public void onCompleted() { println("------>onCompleted()"); } @Override public void onError(Throwable e) { println("------>onError()" + e); } @Override public void onNext(Timestamped mTimestamped) { println("------->onNext()"+mTimestamped); } }); }
private Func1<Timestamped<RecentPhotosResponse>, Boolean> getRecentPhotosFilter(final ITimestampedView timestampedView) { return new Func1<Timestamped<RecentPhotosResponse>, Boolean>() { @Override public Boolean call(Timestamped<RecentPhotosResponse> recentPhotosResponseTimestamped) { StringBuilder logMessage = new StringBuilder("getMergedPhotos().filter() - Filtering results"); if (recentPhotosResponseTimestamped == null) { logMessage.append(", recentPhotosResponseTimestamped is null"); } else { logMessage.append(", timestamps=").append(recentPhotosResponseTimestamped.getTimestampMillis()).append(">").append(timestampedView.getViewDataTimestampMillis()).append("?"); } logMessage.append(", thread=").append(Thread.currentThread().getName()); Log.d(CLASSNAME, logMessage.toString()); // filter it // if result is null - ignore it // if timestamp of new arrived (emission) data is less than timestamp of already displayed data — ignore it. return recentPhotosResponseTimestamped != null && recentPhotosResponseTimestamped.getValue() != null && recentPhotosResponseTimestamped.getValue().photos != null && recentPhotosResponseTimestamped.getTimestampMillis() > timestampedView.getViewDataTimestampMillis(); } }; }
/** * @return a sequence of timestamped values created by adding timestamps to each item in the input sequence. */ @Override public Subscriber<? super T> call(final Subscriber<? super Timestamped<T>> o) { return new Subscriber<T>(o) { @Override public void onComplete() { o.onComplete(); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onNext(T t) { o.onNext(new Timestamped<T>(scheduler.now(), t)); } }; }
@Test public void timestampWithScheduler() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> source = PublishSubject.create(); Observable<Timestamped<Integer>> m = source.timestamp(scheduler); m.subscribe(observer); source.onNext(1); scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); source.onNext(2); scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); source.onNext(3); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 1)); inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(100, 2)); inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(200, 3)); verify(observer, never()).onError(any(Throwable.class)); verify(observer, never()).onComplete(); }
@Test public void timestampWithScheduler2() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> source = PublishSubject.create(); Observable<Timestamped<Integer>> m = source.timestamp(scheduler); m.subscribe(observer); source.onNext(1); source.onNext(2); scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); source.onNext(3); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 1)); inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 2)); inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(200, 3)); verify(observer, never()).onError(any(Throwable.class)); verify(observer, never()).onComplete(); }
/** * @return a sequence of timestamped values created by adding timestamps to each item in the input sequence. */ @Override public Subscriber<? super T> call(final Subscriber<? super Timestamped<T>> o) { return new Subscriber<T>(o) { @Override public void onCompleted() { o.onCompleted(); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onNext(T t) { o.onNext(new Timestamped<T>(scheduler.now(), t)); } }; }
public Subscriber<? super T> call(final Subscriber<? super Timestamped<T>> o) { return new Subscriber<T>(o) { public void onCompleted() { o.onCompleted(); } public void onError(Throwable e) { o.onError(e); } public void onNext(T t) { o.onNext(new Timestamped(OperatorTimestamp.this.scheduler.now(), t)); } }; }
@RxLogObservable private Observable<Timestamped<RecentPhotosResponse>> getMergedPhotos() { return Observable.mergeDelayError( flickrDiskRepository.getRecentPhotos().subscribeOn(Schedulers.io()), flickrNetworkRepository.getRecentPhotos().timestamp().doOnNext(new Action1<Timestamped<RecentPhotosResponse>>() { @Override public void call(Timestamped<RecentPhotosResponse> recentPhotosResponse) { Log.d(CLASSNAME, "flickrApiRepository.getRecentPhotos().doOnNext() - Saving photos to disk - thread=" + Thread.currentThread().getName()); flickrDiskRepository.savePhotos(recentPhotosResponse); } }).subscribeOn(Schedulers.io()) ); }
@RxLogObservable public Observable<Timestamped<RecentPhotosResponse>> getRecentPhotos() { return Observable.fromCallable(new Callable<Timestamped<RecentPhotosResponse>>() { @Override public Timestamped<RecentPhotosResponse> call() throws Exception { // if (true) throw new RuntimeException("DISK.getRecentPhotos() fake Exception!"); String serializedPhotoList = sharedPreferences.getString(RECENT_PHOTOS_RESPONSE_KEY, ""); Timestamped<RecentPhotosResponse> photos = null; if (!TextUtils.isEmpty(serializedPhotoList)) { photos = flickrPhotosJsonAdapter.fromJson(serializedPhotoList); } return photos; } }); }
private void setupView(View view) { swipeRefreshLayout = (SwipeRefreshLayout) view.findViewById(R.id.flickr_swipe_refresh); swipeRefreshLayout.setColorSchemeResources(android.R.color.holo_orange_dark); swipeRefreshLayout.setOnRefreshListener(this); recyclerView = (RecyclerView) view.findViewById(R.id.my_recycler_view); // use this setting to improve performance if you know that changes // in content do not change the layout size of the RecyclerView recyclerView.setHasFixedSize(true); // use a linear layout manager recyclerView.setLayoutManager(new GridLayoutManager(getContext(), 2)); // specify an adapter recyclerView.setAdapter(flickrListAdapter = new FlickrListAdapter(new Timestamped<>(getViewDataTimestampMillis(), Collections.<FlickrCardVM>emptyList()))); }
private void fetchFlickrItems() { isRefreshing(true); unsubscribe(); Observable<Timestamped<List<FlickrCardVM>>> recentPhotosObservable = flickrDomainService .getRecentPhotos(this) .observeOn(AndroidSchedulers.mainThread(), true); // delayError = true flickrListSubscription = recentPhotosObservable.subscribe(flickrRecentPhotosOnNext, flickrRecentPhotosOnError, flickrRecenPhotosOnComplete); }
/** * Fetches a new value, caches it and immediately emits it to subscribers. If this errors, * the error will <em>not</em> be relayed to subscribers of {@link #get()} */ public Completable sync() { return fetch() .doOnNext(new Action1<Timestamped<T>>() { @Override public void call(Timestamped<T> tTimestamped) { updates.onNext(tTimestamped.getValue()); } }) .toCompletable(); }
private Observable<Timestamped<T>> fetch() { return Observable.defer(new Func0<Observable<Timestamped<T>>>() { @Override public Observable<Timestamped<T>> call() { Observable<Timestamped<T>> newCache = coldSource.timestamp(scheduler).cache(); cache = newCache; return newCache; } }).doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { invalidate(); } }); }
private void loadData() { LocationHelper locationHelper = new LocationHelper(this, mGoogleApiClient); Subscription subscription = locationHelper.getLocation() .timeout(LOCATION_TIMEOUT_SECONDS, TimeUnit.SECONDS) .timestamp() .flatMap(new Func1<Timestamped<Location>, Observable<WeatherData>>() { @Override public Observable<WeatherData> call(final Timestamped<Location> timestampedLocation) { final Location location = timestampedLocation.getValue(); double lat = location.getLatitude(); double lon = location.getLongitude(); return getWeatherForecast(lat, lon); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<WeatherData>() { @Override public void onCompleted() { Timber.d("onCompleted()"); } @Override public void onError(Throwable e) { Timber.d("onError: %s", e.getMessage()); hideLoadingIndicator(); } @Override public void onNext(WeatherData weatherData) { updateUi(weatherData); } }); mCompositeSubscription.add(subscription); }
@Test public void test_10() { Observable.just(1, 2, 3) .timestamp().subscribe(new Action1<Timestamped<Integer>>() { @Override public void call(Timestamped<Integer> integerTimestamped) { System.out.println(integerTimestamped.toString()); } }); }
@Override void truncate() { long timeLimit = scheduler.now() - maxAgeInMillis; Node prev = get(); Node next = prev.get(); int e = 0; for (;;) { if (next != null) { if (size > limit) { e++; size--; prev = next; next = next.get(); } else { Timestamped<?> v = (Timestamped<?>)next.value; if (v.getTimestampMillis() <= timeLimit) { e++; size--; prev = next; next = next.get(); } else { break; } } } else { break; } } if (e != 0) { setFirst(prev); } }
@Override void truncateFinal() { long timeLimit = scheduler.now() - maxAgeInMillis; Node prev = get(); Node next = prev.get(); int e = 0; for (;;) { if (next != null && size > 1) { Timestamped<?> v = (Timestamped<?>)next.value; if (v.getTimestampMillis() <= timeLimit) { e++; size--; prev = next; next = next.get(); } else { break; } } else { break; } } if (e != 0) { setFirst(prev); } }
public final ParallelObservable<Timestamped<T>> timestamp() { return create(new Func1<Observable<T>, Observable<Timestamped<T>>>() { @Override public Observable<Timestamped<T>> call(Observable<T> o) { return o.timestamp(); } }); }
public final ParallelObservable<Timestamped<T>> timestamp( final Scheduler scheduler) { return create(new Func1<Observable<T>, Observable<Timestamped<T>>>() { @Override public Observable<Timestamped<T>> call(Observable<T> o) { return o.timestamp(scheduler); } }); }
public final Observable<Timestamped<T>> timestamp() { return timestamp(Schedulers.immediate()); }
public final Observable<Timestamped<T>> timestamp(Scheduler scheduler) { return lift(new OperatorTimestamp(scheduler)); }
public Object call(Object t1) { return new Timestamped(this.scheduler.now(), t1); }
public Object call(Object t1) { return ((Timestamped) t1).getValue(); }
public boolean test(Object value, long now) { return ((Timestamped) value).getTimestampMillis() <= now - this.maxAgeMillis; }
Object enterTransform(Object value) { return new Timestamped(this.scheduler.now(), value); }
Object leaveTransform(Object value) { return ((Timestamped) value).getValue(); }
public RxBus(Scheduler scheduler) { this.scheduler = scheduler; this.subject = ReplaySubject.<Timestamped<Object>>createWithTime(TTL_SECONDS, TimeUnit.SECONDS, scheduler) .toSerialized(); }
private void post(Object value) { subject.onNext(new Timestamped<>(scheduler.now(), value)); }
private Observable<Timestamped<Object>> observable(long savedElapsedRealtime) { long since = savedElapsedRealtime >= 0 ? savedElapsedRealtime : scheduler.now(); return subject.filter(t -> t.getTimestampMillis() >= since); }
@RxLogObservable public Observable<Timestamped<List<FlickrCardVM>>> getRecentPhotos(ITimestampedView timestampedView) { return getMergedPhotos() .filter(getRecentPhotosFilter(timestampedView)) .map(FlickrModelToVmMapping.instance()); }
public FlickrDiskRepository(Context context) { sharedPreferences = PreferenceManager.getDefaultSharedPreferences(context); Moshi moshi = new Moshi.Builder().build(); Type adapterType = Types.newParameterizedType(Timestamped.class, RecentPhotosResponse.class); flickrPhotosJsonAdapter = moshi.adapter(adapterType); }
public void savePhotos(Timestamped<RecentPhotosResponse> photos) { String serializedPhotoList = flickrPhotosJsonAdapter.toJson(photos); sharedPreferences.edit().putString(RECENT_PHOTOS_RESPONSE_KEY, serializedPhotoList).apply(); }
@Override public void call(Timestamped<List<FlickrCardVM>> flickrCardVMs) { Log.d(CLASSNAME, "flickrRecentPhotosOnNext.call() - Displaying card VMs in Adapter"); // refresh the list adapter recyclerView.swapAdapter(flickrListAdapter = new FlickrListAdapter(flickrCardVMs), false); }
public FlickrListAdapter(Timestamped<List<FlickrCardVM>> dataSet) { this.dataSet = dataSet; }
@Override public T call(Timestamped<T> tTimestamped) { return tTimestamped.getValue(); }
@Override public Boolean call(Timestamped<T> tTimestamped) { return scheduler.now() - tTimestamped.getTimestampMillis() < expiryMs; }
@Override Object enterTransform(Object value) { return new Timestamped<Object>(scheduler.now(), value); }
@Override Object leaveTransform(Object value) { return ((Timestamped<?>)value).getValue(); }