public Subscriber<? super T> call(final Subscriber<? super TimeInterval<T>> subscriber) { return new Subscriber<T>(subscriber) { private long lastTimestamp = OperatorTimeInterval.this.scheduler.now(); public void onNext(T args) { long nowTimestamp = OperatorTimeInterval.this.scheduler.now(); subscriber.onNext(new TimeInterval(nowTimestamp - this.lastTimestamp, args)); this.lastTimestamp = nowTimestamp; } public void onCompleted() { subscriber.onCompleted(); } public void onError(Throwable e) { subscriber.onError(e); } }; }
private void startTimer() { updateDisplayTime(); videoTimerSubscription = Observable.interval(1, TimeUnit.SECONDS).timeInterval() .compose(DatabaseUtils.<TimeInterval<Long>>applySchedulers()) .subscribe(new Action1<TimeInterval<Long>>() { @Override public void call(TimeInterval<Long> longTimeInterval) { updateDisplayTime(); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Log.e(LOG_TAG, "Timer error", throwable); } }); }
@Override public Subscriber<? super T> call(final Subscriber<? super TimeInterval<T>> subscriber) { return new Subscriber<T>(subscriber) { // The beginning time is the time when the observer subscribes. private long lastTimestamp = scheduler.now(); @Override public void onNext(T args) { long nowTimestamp = scheduler.now(); subscriber.onNext(new TimeInterval<T>(nowTimestamp - lastTimestamp, args)); lastTimestamp = nowTimestamp; } @Override public void onComplete() { subscriber.onComplete(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; }
@Test public void testTimeInterval() { InOrder inOrder = inOrder(observer); observable.subscribe(observer); testScheduler.advanceTimeBy(1000, TIME_UNIT); subject.onNext(1); testScheduler.advanceTimeBy(2000, TIME_UNIT); subject.onNext(2); testScheduler.advanceTimeBy(3000, TIME_UNIT); subject.onNext(3); subject.onComplete(); inOrder.verify(observer, times(1)).onNext( new TimeInterval<Integer>(1000, 1)); inOrder.verify(observer, times(1)).onNext( new TimeInterval<Integer>(2000, 2)); inOrder.verify(observer, times(1)).onNext( new TimeInterval<Integer>(3000, 3)); inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); }
@Override public Subscriber<? super T> call(final Subscriber<? super TimeInterval<T>> subscriber) { return new Subscriber<T>(subscriber) { // The beginning time is the time when the observer subscribes. private long lastTimestamp = scheduler.now(); @Override public void onNext(T args) { long nowTimestamp = scheduler.now(); subscriber.onNext(new TimeInterval<T>(nowTimestamp - lastTimestamp, args)); lastTimestamp = nowTimestamp; } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; }
public void runCode() { //将一个Observable转换为发射两个数据之间所耗费时间的Observable Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS); values.take(3) .timeInterval() .subscribe(new Action1<TimeInterval>() { @Override public void call(TimeInterval mTimeInterval) { println(mTimeInterval.toString()); } }); }
Observable.Transformer<Void, List<TimeInterval<Void>>> collectTapSequence() { return observable -> observable .observeOn(Schedulers.io()) .doOnNext(clickEvent -> Timber.d("tap")) .timeInterval() .skip(1) .timeout(timeInterval -> Observable.timer(1500, TimeUnit.MILLISECONDS), Observable.empty()) .toList() .observeOn(AndroidSchedulers.mainThread()); }
private void initializeRecorder() { clickSubscription = RxView.clicks(recordingMessage) .compose(collectTapSequence()) .subscribe(new Subscriber<List<TimeInterval<Void>>>() { @Override public void onCompleted() { Timber.v("tapsSubscriber onCompleted"); } @Override public void onError(Throwable e) { Timber.v(e, "ruh roh"); } @Override public void onNext(List<TimeInterval<Void>> durations) { Timber.v("onNext"); if (patternIsValid(durations)) { storePattern(durations); activateLock(); } else { Timber.v("pattern invalid"); Toast.makeText(MainActivity.this, R.string.pattern_invalid, Toast.LENGTH_SHORT).show(); Handler handler = new Handler(); handler.post(()-> initializeRecorder()); } } } ); }
public void initializeTapToUnlock() { clickSubscription = RxView.clicks(hiddenMessage) .doOnNext(clickEvent -> { Timber.v("unlock tap"); showBorderAnimation(); }) .compose(collectTapSequence()) .subscribe(new Subscriber<List<TimeInterval<Void>>>() { @Override public void onCompleted() { hideBorderAnimation(); } @Override public void onError(Throwable e) { Timber.v(e, "ruh roh"); } @Override public void onNext(List<TimeInterval<Void>> durations) { Timber.v("onNext"); if (patternMatches(durations)) { Timber.v("lock deactivated"); deactivateLock(); } else { Timber.v("pattern incorrect"); Toast.makeText(MainActivity.this, R.string.pattern_incorrect, Toast.LENGTH_SHORT).show(); hiddenMessage.setBackgroundResource(R.drawable.message_background); Handler handler = new Handler(); handler.post(()-> initializeTapToUnlock()); } } }); }
private boolean patternMatches(List<TimeInterval<Void>> durations) { Long[] durationArray = convertTimeIntervalsToLongs(durations); List<Long> storedPattern = retrievePattern(); Timber.v("storedPattern: %s", storedPattern.toString()); Timber.v("enteredPattern: %s", durations.toString()); if (durations.size() != storedPattern.size()) { Timber.v("patterns not same number of taps"); return false; } // scale entered pattern to be same length as stored pattern Long totalStoredPatternDuration = 0L; Long totalEnteredPatternDuration = 0L; for (int i = 0; i < storedPattern.size(); i++) { totalStoredPatternDuration += storedPattern.get(i); totalEnteredPatternDuration += durationArray[i]; } double scalingFactor = (double) totalStoredPatternDuration / (double) totalEnteredPatternDuration; // check stored pattern versus scaled pattern for (int i = 0; i < storedPattern.size(); i++) { Timber.v("stored:%s entered:%s scaled:%s", storedPattern.get(i), durations.get(i), Math.round(scalingFactor * durationArray[i])); if (Math.abs(storedPattern.get(i) - Math.round(scalingFactor * durationArray[i])) > ALLOWED_ERROR) { return false; } } return true; }
private Long[] convertTimeIntervalsToLongs(List<TimeInterval<Void>> durations) { Long[] durationArray = new Long[durations.size()]; for (int i = 0; i < durations.size(); i++) { durationArray[i] = durations.get(i).getIntervalInMilliseconds(); } return durationArray; }
/** * creates an observable that emits object with time between */ public static Observable timedObservable(int delay,TimeUnit timeUnit, final Observable delayedObservable) { Observable<TimeInterval<Long>> timer = Observable.interval(delay, timeUnit).timeInterval(); return Observable.zip(timer, delayedObservable, new Func2() { @Override public Object call(Object o, Object o2) { return o2; } }); }
public final ParallelObservable<TimeInterval<T>> timeInterval() { return create(new Func1<Observable<T>, Observable<TimeInterval<T>>>() { @Override public Observable<TimeInterval<T>> call(Observable<T> o) { return o.timeInterval(); } }); }
public final ParallelObservable<TimeInterval<T>> timeInterval( final Scheduler scheduler) { return create(new Func1<Observable<T>, Observable<TimeInterval<T>>>() { @Override public Observable<TimeInterval<T>> call(Observable<T> o) { return o.timeInterval(scheduler); } }); }
public final Observable<TimeInterval<T>> timeInterval() { return timeInterval(Schedulers.immediate()); }
public final Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) { return lift(new OperatorTimeInterval(scheduler)); }
private boolean patternIsValid(List<TimeInterval<Void>> durations) { return durations.size() > 1; }
private void storePattern(List<TimeInterval<Void>> durations) { Long[] durationArray = convertTimeIntervalsToLongs(durations); editor.putString(DURATIONS, TextUtils.join(",", durationArray)).apply(); Timber.v("Pattern stored."); }
/** * @return * @see rx.Observable#timeInterval() */ public final Observable<TimeInterval<T>> timeInterval() { return boxed.timeInterval(); }
/** * @param scheduler * @return * @see rx.Observable#timeInterval(rx.Scheduler) */ public final Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) { return boxed.timeInterval(scheduler); }