Java 类rx.schedulers.Timestamped 实例源码

项目:Go-RxJava    文件:Fragment_Timestamp.java   
public void runCode() {

        Observable
                .interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .timestamp()
                .subscribe(new Observer<Timestamped>() {
            @Override
            public void onCompleted() {
                println("------>onCompleted()");
            }

            @Override
            public void onError(Throwable e) {
                println("------>onError()" + e);
            }

            @Override
            public void onNext(Timestamped mTimestamped) {
                println("------->onNext()"+mTimestamped);
            }
        });

    }
项目:chaining-rxjava    文件:FlickrDomainService.java   
private Func1<Timestamped<RecentPhotosResponse>, Boolean> getRecentPhotosFilter(final ITimestampedView timestampedView) {
    return new Func1<Timestamped<RecentPhotosResponse>, Boolean>() {
        @Override
        public Boolean call(Timestamped<RecentPhotosResponse> recentPhotosResponseTimestamped) {

            StringBuilder logMessage = new StringBuilder("getMergedPhotos().filter() - Filtering results");
            if (recentPhotosResponseTimestamped == null) {
                logMessage.append(", recentPhotosResponseTimestamped is null");
            } else {
                logMessage.append(", timestamps=").append(recentPhotosResponseTimestamped.getTimestampMillis()).append(">").append(timestampedView.getViewDataTimestampMillis()).append("?");
            }
            logMessage.append(", thread=").append(Thread.currentThread().getName());
            Log.d(CLASSNAME, logMessage.toString());

            // filter it
            // if result is null - ignore it
            // if timestamp of new arrived (emission) data is less than timestamp of already displayed data — ignore it.
            return recentPhotosResponseTimestamped != null
                    && recentPhotosResponseTimestamped.getValue() != null
                    && recentPhotosResponseTimestamped.getValue().photos != null
                    && recentPhotosResponseTimestamped.getTimestampMillis() > timestampedView.getViewDataTimestampMillis();
        }
    };
}
项目:RxJavaFlow    文件:OperatorTimestamp.java   
/**
 * @return a sequence of timestamped values created by adding timestamps to each item in the input sequence.
 */
@Override
public Subscriber<? super T> call(final Subscriber<? super Timestamped<T>> o) {
    return new Subscriber<T>(o) {

        @Override
        public void onComplete() {
            o.onComplete();
        }

        @Override
        public void onError(Throwable e) {
            o.onError(e);
        }

        @Override
        public void onNext(T t) {
            o.onNext(new Timestamped<T>(scheduler.now(), t));
        }

    };
}
项目:RxJavaFlow    文件:OperatorTimestampTest.java   
@Test
public void timestampWithScheduler() {
    TestScheduler scheduler = new TestScheduler();

    PublishSubject<Integer> source = PublishSubject.create();
    Observable<Timestamped<Integer>> m = source.timestamp(scheduler);
    m.subscribe(observer);

    source.onNext(1);
    scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    source.onNext(2);
    scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    source.onNext(3);

    InOrder inOrder = inOrder(observer);

    inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 1));
    inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(100, 2));
    inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(200, 3));

    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, never()).onComplete();
}
项目:RxJavaFlow    文件:OperatorTimestampTest.java   
@Test
public void timestampWithScheduler2() {
    TestScheduler scheduler = new TestScheduler();

    PublishSubject<Integer> source = PublishSubject.create();
    Observable<Timestamped<Integer>> m = source.timestamp(scheduler);
    m.subscribe(observer);

    source.onNext(1);
    source.onNext(2);
    scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    source.onNext(3);

    InOrder inOrder = inOrder(observer);

    inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 1));
    inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 2));
    inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(200, 3));

    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, never()).onComplete();
}
项目:org.openntf.domino    文件:OperatorTimestamp.java   
/**
 * @return a sequence of timestamped values created by adding timestamps to each item in the input sequence.
 */
@Override
public Subscriber<? super T> call(final Subscriber<? super Timestamped<T>> o) {
    return new Subscriber<T>(o) {

        @Override
        public void onCompleted() {
            o.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            o.onError(e);
        }

        @Override
        public void onNext(T t) {
            o.onNext(new Timestamped<T>(scheduler.now(), t));
        }

    };
}
项目:boohee_v5.6    文件:OperatorTimestamp.java   
public Subscriber<? super T> call(final Subscriber<? super Timestamped<T>> o) {
    return new Subscriber<T>(o) {
        public void onCompleted() {
            o.onCompleted();
        }

        public void onError(Throwable e) {
            o.onError(e);
        }

        public void onNext(T t) {
            o.onNext(new Timestamped(OperatorTimestamp.this.scheduler.now(), t));
        }
    };
}
项目:chaining-rxjava    文件:FlickrDomainService.java   
@RxLogObservable
private Observable<Timestamped<RecentPhotosResponse>> getMergedPhotos() {
    return Observable.mergeDelayError(
            flickrDiskRepository.getRecentPhotos().subscribeOn(Schedulers.io()),
            flickrNetworkRepository.getRecentPhotos().timestamp().doOnNext(new Action1<Timestamped<RecentPhotosResponse>>() {
                @Override
                public void call(Timestamped<RecentPhotosResponse> recentPhotosResponse) {
                    Log.d(CLASSNAME, "flickrApiRepository.getRecentPhotos().doOnNext() - Saving photos to disk - thread=" + Thread.currentThread().getName());
                    flickrDiskRepository.savePhotos(recentPhotosResponse);
                }
            }).subscribeOn(Schedulers.io())
    );
}
项目:chaining-rxjava    文件:FlickrDiskRepository.java   
@RxLogObservable
    public Observable<Timestamped<RecentPhotosResponse>> getRecentPhotos() {
        return Observable.fromCallable(new Callable<Timestamped<RecentPhotosResponse>>() {
            @Override
            public Timestamped<RecentPhotosResponse> call() throws Exception {
//                if (true) throw new RuntimeException("DISK.getRecentPhotos() fake Exception!");
                String serializedPhotoList = sharedPreferences.getString(RECENT_PHOTOS_RESPONSE_KEY, "");
                Timestamped<RecentPhotosResponse> photos = null;
                if (!TextUtils.isEmpty(serializedPhotoList)) {
                    photos = flickrPhotosJsonAdapter.fromJson(serializedPhotoList);
                }
                return photos;
            }
        });
    }
项目:chaining-rxjava    文件:FlickrListFragment.java   
private void setupView(View view) {
    swipeRefreshLayout = (SwipeRefreshLayout) view.findViewById(R.id.flickr_swipe_refresh);
    swipeRefreshLayout.setColorSchemeResources(android.R.color.holo_orange_dark);
    swipeRefreshLayout.setOnRefreshListener(this);

    recyclerView = (RecyclerView) view.findViewById(R.id.my_recycler_view);
    // use this setting to improve performance if you know that changes
    // in content do not change the layout size of the RecyclerView
    recyclerView.setHasFixedSize(true);
    // use a linear layout manager
    recyclerView.setLayoutManager(new GridLayoutManager(getContext(), 2));
    // specify an adapter
    recyclerView.setAdapter(flickrListAdapter = new FlickrListAdapter(new Timestamped<>(getViewDataTimestampMillis(), Collections.<FlickrCardVM>emptyList())));
}
项目:chaining-rxjava    文件:FlickrListFragment.java   
private void fetchFlickrItems() {
    isRefreshing(true);
    unsubscribe();
    Observable<Timestamped<List<FlickrCardVM>>> recentPhotosObservable = flickrDomainService
            .getRecentPhotos(this)
            .observeOn(AndroidSchedulers.mainThread(), true); // delayError = true

    flickrListSubscription = recentPhotosObservable.subscribe(flickrRecentPhotosOnNext, flickrRecentPhotosOnError, flickrRecenPhotosOnComplete);
}
项目:RxCache    文件:RxCache.java   
/**
 * Fetches a new value, caches it and immediately emits it to subscribers. If this errors,
 * the error will <em>not</em> be relayed to subscribers of {@link #get()}
 */
public Completable sync() {
  return fetch()
      .doOnNext(new Action1<Timestamped<T>>() {
        @Override public void call(Timestamped<T> tTimestamped) {
          updates.onNext(tTimestamped.getValue());
        }
      })
      .toCompletable();
}
项目:RxCache    文件:RxCache.java   
private Observable<Timestamped<T>> fetch() {
  return Observable.defer(new Func0<Observable<Timestamped<T>>>() {
    @Override public Observable<Timestamped<T>> call() {
      Observable<Timestamped<T>> newCache = coldSource.timestamp(scheduler).cache();
      cache = newCache;
      return newCache;
    }
  }).doOnError(new Action1<Throwable>() {
    @Override public void call(Throwable throwable) {
      invalidate();
    }
  });
}
项目:RxWeather    文件:WeatherActivity.java   
private void loadData() {
    LocationHelper locationHelper = new LocationHelper(this, mGoogleApiClient);
    Subscription subscription = locationHelper.getLocation()
            .timeout(LOCATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
            .timestamp()
            .flatMap(new Func1<Timestamped<Location>, Observable<WeatherData>>() {
                @Override
                public Observable<WeatherData> call(final Timestamped<Location> timestampedLocation) {
                    final Location location = timestampedLocation.getValue();
                    double lat = location.getLatitude();
                    double lon = location.getLongitude();
                    return getWeatherForecast(lat, lon);
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<WeatherData>() {
                @Override
                public void onCompleted() {
                    Timber.d("onCompleted()");
                }

                @Override
                public void onError(Throwable e) {
                    Timber.d("onError: %s", e.getMessage());
                    hideLoadingIndicator();
                }

                @Override
                public void onNext(WeatherData weatherData) {
                    updateUi(weatherData);
                }
            });

    mCompositeSubscription.add(subscription);
}
项目:Integration    文件:ExampleUnitTest.java   
@Test
public void test_10() {
    Observable.just(1, 2, 3)
            .timestamp().subscribe(new Action1<Timestamped<Integer>>() {
        @Override
        public void call(Timestamped<Integer> integerTimestamped) {
            System.out.println(integerTimestamped.toString());
        }
    });
}
项目:android-bt    文件:OperatorReplayFix.java   
@Override
void truncate() {
  long timeLimit = scheduler.now() - maxAgeInMillis;

  Node prev = get();
  Node next = prev.get();

  int e = 0;
  for (;;) {
    if (next != null) {
      if (size > limit) {
        e++;
        size--;
        prev = next;
        next = next.get();
      } else {
        Timestamped<?> v = (Timestamped<?>)next.value;
        if (v.getTimestampMillis() <= timeLimit) {
          e++;
          size--;
          prev = next;
          next = next.get();
        } else {
          break;
        }
      }
    } else {
      break;
    }
  }
  if (e != 0) {
    setFirst(prev);
  }
}
项目:android-bt    文件:OperatorReplayFix.java   
@Override
void truncateFinal() {
  long timeLimit = scheduler.now() - maxAgeInMillis;

  Node prev = get();
  Node next = prev.get();

  int e = 0;
  for (;;) {
    if (next != null && size > 1) {
      Timestamped<?> v = (Timestamped<?>)next.value;
      if (v.getTimestampMillis() <= timeLimit) {
        e++;
        size--;
        prev = next;
        next = next.get();
      } else {
        break;
      }
    } else {
      break;
    }
  }
  if (e != 0) {
    setFirst(prev);
  }
}
项目:rxjava-parallel    文件:ParallelObservable.java   
public final ParallelObservable<Timestamped<T>> timestamp() {
    return create(new Func1<Observable<T>, Observable<Timestamped<T>>>() {
        @Override
        public Observable<Timestamped<T>> call(Observable<T> o) {
            return o.timestamp();
        }
    });
}
项目:rxjava-parallel    文件:ParallelObservable.java   
public final ParallelObservable<Timestamped<T>> timestamp(
        final Scheduler scheduler) {
    return create(new Func1<Observable<T>, Observable<Timestamped<T>>>() {
        @Override
        public Observable<Timestamped<T>> call(Observable<T> o) {
            return o.timestamp(scheduler);
        }
    });
}
项目:letv    文件:Observable.java   
public final Observable<Timestamped<T>> timestamp() {
    return timestamp(Schedulers.immediate());
}
项目:letv    文件:Observable.java   
public final Observable<Timestamped<T>> timestamp(Scheduler scheduler) {
    return lift(new OperatorTimestamp(scheduler));
}
项目:boohee_v5.6    文件:ReplaySubject.java   
public Object call(Object t1) {
    return new Timestamped(this.scheduler.now(), t1);
}
项目:boohee_v5.6    文件:ReplaySubject.java   
public Object call(Object t1) {
    return ((Timestamped) t1).getValue();
}
项目:boohee_v5.6    文件:ReplaySubject.java   
public boolean test(Object value, long now) {
    return ((Timestamped) value).getTimestampMillis() <= now - this.maxAgeMillis;
}
项目:boohee_v5.6    文件:OperatorReplay.java   
Object enterTransform(Object value) {
    return new Timestamped(this.scheduler.now(), value);
}
项目:boohee_v5.6    文件:OperatorReplay.java   
Object leaveTransform(Object value) {
    return ((Timestamped) value).getValue();
}
项目:boohee_v5.6    文件:Observable.java   
public final Observable<Timestamped<T>> timestamp() {
    return timestamp(Schedulers.immediate());
}
项目:boohee_v5.6    文件:Observable.java   
public final Observable<Timestamped<T>> timestamp(Scheduler scheduler) {
    return lift(new OperatorTimestamp(scheduler));
}
项目:DataSyncDemo    文件:RxBus.java   
public RxBus(Scheduler scheduler) {
    this.scheduler = scheduler;
    this.subject = ReplaySubject.<Timestamped<Object>>createWithTime(TTL_SECONDS, TimeUnit.SECONDS, scheduler)
            .toSerialized();
}
项目:DataSyncDemo    文件:RxBus.java   
private void post(Object value) {
    subject.onNext(new Timestamped<>(scheduler.now(), value));
}
项目:DataSyncDemo    文件:RxBus.java   
private Observable<Timestamped<Object>> observable(long savedElapsedRealtime) {
    long since = savedElapsedRealtime >= 0 ? savedElapsedRealtime : scheduler.now();
    return subject.filter(t -> t.getTimestampMillis() >= since);
}
项目:chaining-rxjava    文件:FlickrDomainService.java   
@RxLogObservable
public Observable<Timestamped<List<FlickrCardVM>>> getRecentPhotos(ITimestampedView timestampedView) {
    return getMergedPhotos()
            .filter(getRecentPhotosFilter(timestampedView))
            .map(FlickrModelToVmMapping.instance());
}
项目:chaining-rxjava    文件:FlickrDiskRepository.java   
public FlickrDiskRepository(Context context) {
    sharedPreferences = PreferenceManager.getDefaultSharedPreferences(context);
    Moshi moshi = new Moshi.Builder().build();
    Type adapterType = Types.newParameterizedType(Timestamped.class, RecentPhotosResponse.class);
    flickrPhotosJsonAdapter = moshi.adapter(adapterType);
}
项目:chaining-rxjava    文件:FlickrDiskRepository.java   
public void savePhotos(Timestamped<RecentPhotosResponse> photos) {
    String serializedPhotoList = flickrPhotosJsonAdapter.toJson(photos);
    sharedPreferences.edit().putString(RECENT_PHOTOS_RESPONSE_KEY, serializedPhotoList).apply();
}
项目:chaining-rxjava    文件:FlickrListFragment.java   
@Override
public void call(Timestamped<List<FlickrCardVM>> flickrCardVMs) {
    Log.d(CLASSNAME, "flickrRecentPhotosOnNext.call() - Displaying card VMs in Adapter");
    // refresh the list adapter
    recyclerView.swapAdapter(flickrListAdapter = new FlickrListAdapter(flickrCardVMs), false);
}
项目:chaining-rxjava    文件:FlickrListAdapter.java   
public FlickrListAdapter(Timestamped<List<FlickrCardVM>> dataSet) {
    this.dataSet = dataSet;
}
项目:RxCache    文件:RxCache.java   
@Override public T call(Timestamped<T> tTimestamped) {
  return tTimestamped.getValue();
}
项目:RxCache    文件:RxCache.java   
@Override public Boolean call(Timestamped<T> tTimestamped) {
  return scheduler.now() - tTimestamped.getTimestampMillis() < expiryMs;
}
项目:android-bt    文件:OperatorReplayFix.java   
@Override
Object enterTransform(Object value) {
  return new Timestamped<Object>(scheduler.now(), value);
}
项目:android-bt    文件:OperatorReplayFix.java   
@Override
Object leaveTransform(Object value) {
  return ((Timestamped<?>)value).getValue();
}