Java 类rx.schedulers.TestScheduler 实例源码

项目:RxJavaFlow    文件:OperatorObserveOnTest.java   
@Test
public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
    TestScheduler testScheduler = new TestScheduler();

    Observable<Integer> source = Observable.concat(Observable.<Integer> error(new TestException()), Observable.just(1));

    @SuppressWarnings("unchecked")
    Observer<Integer> o = mock(Observer.class);
    InOrder inOrder = inOrder(o);

    source.observeOn(testScheduler).subscribe(o);

    inOrder.verify(o, never()).onError(any(TestException.class));

    testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    inOrder.verify(o).onError(any(TestException.class));
    inOrder.verify(o, never()).onNext(anyInt());
    inOrder.verify(o, never()).onComplete();
}
项目:rxeither    文件:RxEitherTest.java   
@SuppressWarnings("unchecked")
@Test
public void multipleLeftTerminalRightOtherThread() {
    TestScheduler otherScheduler = Schedulers.test();
    TestSubject<EventB> eventBSubject = TestSubject.create(otherScheduler);
    Observable<Either<EventA, EventB>> either = RxEither.from(eventASubject, eventBSubject);
    either.subscribe(subscriber);

    eventASubject.onNext(eventA);
    eventBSubject.onNext(eventB);
    eventBSubject.onCompleted();
    eventASubject.onNext(eventA);

    testScheduler.triggerActions();
    subscriber.assertNotCompleted();
    otherScheduler.triggerActions();

    subscriber.assertNoErrors();
    subscriber.assertCompleted();
    subscriber.assertUnsubscribed();
    subscriber.assertValues(Either.<EventA, EventB>left(eventA),
                            Either.<EventA, EventB>left(eventA),
                            Either.<EventA, EventB>right(eventB));
}
项目:RxNormalize    文件:OperatorNormalizeTest.java   
@Test
public void basic() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> subject = PublishSubject.create();
  RecordingObserver<Integer> o = new RecordingObserver<>();
  subject
      .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler))
      .subscribe(o);

  subject.onNext(0);
  o.takeNext();

  subject.onNext(1);
  o.assertNoMoreEvents();

  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  o.takeNext();

  subject.onCompleted();
  o.assertOnCompleted();
}
项目:RxNormalize    文件:OperatorNormalizeTest.java   
@Test
public void completion() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> subject = PublishSubject.create();
  RecordingObserver<Integer> o = new RecordingObserver<>();
  subject
      .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler))
      .subscribe(o);

  // First emits immediately
  subject.onNext(0);
  o.takeNext();

  subject.onNext(1);
  subject.onCompleted();
  o.assertNoMoreEvents();

  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  o.takeNext();
  o.assertOnCompleted();
}
项目:RxNormalize    文件:OperatorNormalizeTest.java   
@Test
public void error() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> subject = PublishSubject.create();
  RecordingObserver<Integer> o = new RecordingObserver<>();
  subject
      .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler))
      .subscribe(o);

  // First emits immediately
  subject.onNext(0);
  o.takeNext();

  subject.onNext(1);
  subject.onError(new RuntimeException("Blah"));
  o.assertNoMoreEvents();

  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  o.takeNext();
  assertThat(o.takeError()).isInstanceOf(RuntimeException.class);
}
项目:RxNormalize    文件:OperatorNormalizeTest.java   
@Test
public void unsubscription() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> subject = PublishSubject.create();
  RecordingObserver<Integer> o = new RecordingObserver<>();
  Subscription sub = subject
      .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler))
      .subscribe(o);

  // First emits immediately
  subject.onNext(0);
  o.takeNext();

  subject.onNext(1);
  o.assertNoMoreEvents();

  sub.unsubscribe();

  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  o.assertNoMoreEvents();
}
项目:RxNormalize    文件:OperatorNormalizeTest.java   
@Test
public void overDelay_shouldEmitImmediately() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> subject = PublishSubject.create();
  RecordingObserver<Integer> o = new RecordingObserver<>();
  subject
      .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler))
      .subscribe(o);

  // First emits immediately
  subject.onNext(0);
  o.takeNext();

  scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
  subject.onNext(1);
  o.takeNext();
}
项目:MarketData    文件:MulticastEventStreamClientTest.java   
@Test
public void Should_transmit_events_from_the_target_client() {
    // given
    TestScheduler scheduler = Schedulers.test();
    PublishSubject<String> subject = PublishSubject.create();
    EventStreamClient targetClient = mock(EventStreamClient.class);
    when(targetClient.readServerSideEvents()).thenReturn(subject);
    MulticastEventStreamClient multicastEventStreamClient = new MulticastEventStreamClient(targetClient, scheduler);
    Observable<String> events = multicastEventStreamClient.readServerSideEvents();
    // when
    TestSubscriber<String> subscriber = new TestSubscriber<>();
    events.subscribe(subscriber);
    subject.onNext("Hello!");
    // then
    assertThat(subscriber.getOnNextEvents()).hasSize(1).contains("Hello!");
}
项目:akarnokd-misc    文件:ScanReplay.java   
@Test
public void testExpectedReplayBehavior() {
    final TestScheduler scheduler = new TestScheduler();
    final TestSubject<Integer> subject = TestSubject.create(scheduler);
    final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

    final ConnectableObservable<Integer> sums = subject.scan((a, b) -> a + b).replay(1);
    sums.connect();

    subject.onNext(1);
    subject.onNext(2);
    subject.onNext(3);
    scheduler.triggerActions();

    sums.subscribe(subscriber);

    subscriber.assertValueCount(1);
    subscriber.assertValues(6);
}
项目:akarnokd-misc    文件:ScanReplay.java   
@Test
    public void testFlakyReplayBehavior() {
        final TestScheduler scheduler = new TestScheduler();
        final TestSubject<Integer> subject = TestSubject.create(scheduler);
        final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

        final ConnectableObservable<Integer> sums = subject.scan(1, (a, b) -> a + b).replay(1);
        sums.connect();

        subject.onNext(2);
        subject.onNext(3);
        scheduler.triggerActions();

        sums.subscribe(subscriber);

//        subscriber.assertValueCount(1);
        subscriber.assertValues(6);
    }
项目:AnDevCon-RxPatterns    文件:Example16Test.java   
@Test
public void should_test_observable_interval() {
    TestScheduler scheduler = new TestScheduler();
    final List<Long> result = new ArrayList<>();
    Observable.interval(1, TimeUnit.SECONDS, scheduler)
            .take(5)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    result.add(aLong);
                }
            });
    assertTrue(result.isEmpty());
    scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
    assertEquals(2, result.size());
    scheduler.advanceTimeBy(10, TimeUnit.SECONDS);
    assertEquals(5, result.size());
}
项目:arctor    文件:WaitViewReplayTransformerTest.java   
@Test
public void shouldEmitErrorAfterViewIsAttached() {
    TestScheduler testScheduler = Schedulers.test();
    BehaviorSubject<Boolean> view = BehaviorSubject.create();
    view.onNext(true);
    WaitViewReplayTransformer<Object> transformer = new WaitViewReplayTransformer<>(
            view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler));
    TestSubscriber<Object> testSubscriber = new TestSubscriber<>();
    Observable.error(new RuntimeException())
            .compose(transformer)
            .subscribe(testSubscriber);
    testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS);
    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertError(RuntimeException.class);
}
项目:arctor    文件:WaitViewLatestTransformerTest.java   
@Test
public void shouldEmitValueAfterViewIsAttached() {
    TestScheduler testScheduler = Schedulers.test();
    BehaviorSubject<Boolean> view = BehaviorSubject.create();
    view.onNext(true);
    WaitViewLatestTransformer<Integer> transformer =
            new WaitViewLatestTransformer<>(view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler));
    TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
    Observable.just(0)
            .compose(transformer)
            .subscribe(testSubscriber);
    testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS);
    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertValue(0);
    testSubscriber.assertCompleted();
}
项目:rxjava-extras    文件:OperatorSampleFirstTest.java   
@Test
public void testSampleWindowIsConstantDuration() {
    @SuppressWarnings("unchecked")
    Observer<Integer> observer = mock(Observer.class);
    TestScheduler s = new TestScheduler();
    PublishSubject<Integer> o = PublishSubject.create();
    o.compose(Transformers.<Integer> sampleFirst(1000, TimeUnit.MILLISECONDS, s))
            .subscribe(observer);

    // send events with simulated time increments
    s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
    o.onNext(1);
    s.advanceTimeTo(1200, TimeUnit.MILLISECONDS);
    o.onNext(2);
    s.advanceTimeTo(2100, TimeUnit.MILLISECONDS);
    o.onNext(3);
    o.onCompleted();

    InOrder inOrder = inOrder(observer);
    inOrder.verify(observer).onNext(1);
    inOrder.verify(observer).onNext(2);
    inOrder.verify(observer).onNext(3);
    inOrder.verify(observer).onCompleted();
    inOrder.verifyNoMoreInteractions();
}
项目:rxjava-extras    文件:ObsTest.java   
@Test
public void testCachedScheduledReset() {
    TestScheduler scheduler = new TestScheduler();
    Worker worker = scheduler.createWorker();
    try {
        final AtomicInteger count = new AtomicInteger(0);
        Observable<Integer> source = Observable.defer(new Func0<Observable<Integer>>() {
            @Override
            public Observable<Integer> call() {
                return Observable.just(count.incrementAndGet());
            }
        })
                // cache
                .compose(Transformers.<Integer> cache(5, TimeUnit.MINUTES, worker));
        assertEquals(1, (int) source.toBlocking().single());
        scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
        assertEquals(1, (int) source.toBlocking().single());
        scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
        assertEquals(1, (int) source.toBlocking().single());
        scheduler.advanceTimeBy(3, TimeUnit.MINUTES);
        assertEquals(2, (int) source.toBlocking().single());
        assertEquals(2, (int) source.toBlocking().single());
    } finally {
        worker.unsubscribe();
    }
}
项目:rxstate    文件:RxStateTest.java   
@Test
public void startSchedule() {
    TestScheduler scheduler = new TestScheduler();
    RxState<Integer> state = new RxState<>(0, scheduler);
    TestSubscriber<Integer> subscriber = new TestSubscriber<>();
    state.values(StartWith.SCHEDULE).subscribe(subscriber);
    subscriber.assertValues();
    state.apply(it -> it + 1);
    scheduler.triggerActions();
    subscriber.assertValues(0, 1);
}
项目:MarbleTest4J    文件:HotObservableTest.java   
@Test
public void should_not_send_notification_occurring_before_subscribe() {
    // given
    TestScheduler scheduler = new TestScheduler();
    Recorded<String> event = new Recorded<>(10, Notification.createOnNext("Hello world!"));
    final HotObservable<String> hotObservable = HotObservable.create(scheduler, event);
    // when
    final TestSubscriber<String> subscriber = new TestSubscriber<>();
    scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            hotObservable.subscribe(subscriber);
        }
    }, 15, TimeUnit.MILLISECONDS);
    // then
    scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    assertThat(subscriber.getOnNextEvents()).isEmpty();
}
项目:MarbleTest4J    文件:HotObservableTest.java   
@Test
public void should_not_send_notification_occurring_after_unsubscribe() {
    // given
    TestScheduler scheduler = new TestScheduler();
    Recorded<String> event = new Recorded<>(10, Notification.createOnNext("Hello world!"));
    final HotObservable<String> hotObservable = HotObservable.create(scheduler, event);
    // when
    final TestSubscriber<String> subscriber = new TestSubscriber<>();
    final Subscription subscription = hotObservable.subscribe(subscriber);
    scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            subscription.unsubscribe();
        }
    }, 5, TimeUnit.MILLISECONDS);
    // then
    scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    assertThat(subscriber.getOnNextEvents()).isEmpty();
}
项目:MarbleTest4J    文件:HotObservableTest.java   
@Test
public void should_keep_track_of_subscriptions() {
    // given
    TestScheduler scheduler = new TestScheduler();
    final HotObservable<String> hotObservable = HotObservable.create(scheduler);
    // when
    final TestSubscriber<String> subscriber = new TestSubscriber<>();

    scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            hotObservable.subscribe(subscriber);
        }
    }, 42, TimeUnit.MILLISECONDS);
    // then
    scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS);
    assertThat(hotObservable.subscriptions)
            .containsExactly(
                    new SubscriptionLog(42, Long.MAX_VALUE)
            );
}
项目:MarbleTest4J    文件:HotObservableTest.java   
@Test
public void should_keep_track_of_unsubscriptions() {
    // given
    TestScheduler scheduler = new TestScheduler();
    final HotObservable<String> hotObservable = HotObservable.create(scheduler);
    // when
    final TestSubscriber<String> subscriber = new TestSubscriber<>();
    final Subscription subscription = hotObservable.subscribe(subscriber);
    scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            subscription.unsubscribe();
        }
    }, 42, TimeUnit.MILLISECONDS);
    // then
    scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS);
    assertThat(hotObservable.subscriptions)
            .containsExactly(
                    new SubscriptionLog(0, 42)
            );
}
项目:MarbleTest4J    文件:ColdObservableTest.java   
@Test
public void should_send_notification_on_subscribe_using_offset() {
    // given
    TestScheduler scheduler = new TestScheduler();
    long offset = 10;
    Recorded<String> event = new Recorded<>(offset, Notification.createOnNext("Hello world!"));
    ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event);
    // when
    TestSubscriber<String> subscriber = new TestSubscriber<>();
    coldObservable.subscribe(subscriber);
    // then
    scheduler.advanceTimeBy(9, TimeUnit.MILLISECONDS);
    assertThat(subscriber.getOnNextEvents()).isEmpty();
    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    assertThat(subscriber.getOnNextEvents()).containsExactly("Hello world!");
}
项目:MarbleTest4J    文件:ColdObservableTest.java   
@Test
public void should_not_send_notification_after_unsubscribe() {
    // given
    TestScheduler scheduler = new TestScheduler();
    long offset = 10;
    Recorded<String> event = new Recorded<>(offset, Notification.createOnNext("Hello world!"));
    ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event);
    TestSubscriber<String> subscriber = new TestSubscriber<>();
    final Subscription subscription = coldObservable.subscribe(subscriber);
    // when
    scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            subscription.unsubscribe();
        }
    }, 5, TimeUnit.MILLISECONDS);
    // then
    scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    assertThat(subscriber.getOnNextEvents()).isEmpty();
}
项目:MarbleTest4J    文件:ColdObservableTest.java   
@Test
public void should_be_cold_and_send_notification_at_subscribe_time() {
    // given
    TestScheduler scheduler = new TestScheduler();
    Recorded<String> event = new Recorded<>(0, Notification.createOnNext("Hello world!"));
    final ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event);
    // when
    final TestSubscriber<String> subscriber = new TestSubscriber<>();
    scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            coldObservable.subscribe(subscriber);
        }
    }, 42, TimeUnit.SECONDS);
    // then
    scheduler.advanceTimeBy(42, TimeUnit.SECONDS);
    assertThat(subscriber.getOnNextEvents()).containsExactly("Hello world!");
}
项目:MarbleTest4J    文件:ColdObservableTest.java   
@Test
public void should_keep_track_of_subscriptions() {
    // given
    TestScheduler scheduler = new TestScheduler();
    final ColdObservable<String> coldObservable = ColdObservable.create(scheduler);
    // when
    final TestSubscriber<String> subscriber = new TestSubscriber<>();

    scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            coldObservable.subscribe(subscriber);
        }
    }, 42, TimeUnit.MILLISECONDS);
    // then
    scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS);
    assertThat(coldObservable.getSubscriptions())
            .containsExactly(
                    new SubscriptionLog(42, Long.MAX_VALUE)
            );
}
项目:MarbleTest4J    文件:ColdObservableTest.java   
@Test
public void should_keep_track_of_unsubscriptions() {
    // given
    TestScheduler scheduler = new TestScheduler();
    final ColdObservable<String> coldObservable = ColdObservable.create(scheduler);
    // when
    final TestSubscriber<String> subscriber = new TestSubscriber<>();
    final Subscription subscription = coldObservable.subscribe(subscriber);
    scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            subscription.unsubscribe();
        }
    }, 42, TimeUnit.MILLISECONDS);
    // then
    scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS);
    assertThat(coldObservable.getSubscriptions())
            .containsExactly(
                    new SubscriptionLog(0, 42)
            );
}
项目:RxNormalize    文件:OperatorNormalizeTest.java   
@Test
public void buffer() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> subject = PublishSubject.create();
  RecordingObserver<Integer> o = new RecordingObserver<>();
  subject
      .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler))
      .subscribe(o);

  // First emits immediately
  subject.onNext(0);
  o.takeNext();

  subject.onNext(1);
  subject.onNext(2);
  subject.onNext(3);
  o.assertNoMoreEvents();

  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  o.takeNext();
  o.assertNoMoreEvents();
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  o.takeNext();
  o.assertNoMoreEvents();
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  o.takeNext();
  o.assertNoMoreEvents();

  subject.onCompleted();
  o.assertOnCompleted();
}
项目:MarketData    文件:MulticastEventStreamClientTest.java   
/**
 * Test 21
 */
@Test
@Ignore
public void Should_generate_only_one_subscription_side_effect_with_multiple_subscribers() {
    // given
    Observable<String> source = Observable.create(subscriber -> {
        subscriber.onNext("open");
    });
    TestScheduler scheduler = Schedulers.test();
    EventStreamClient targetClient = mock(EventStreamClient.class);
    when(targetClient.readServerSideEvents()).thenReturn(source);
    MulticastEventStreamClient multicastEventStreamClient = new MulticastEventStreamClient(targetClient, scheduler);
    Observable<String> events = multicastEventStreamClient.readServerSideEvents();
    // when
    TestSubscriber<String> subscriber1 = new TestSubscriber<>();
    TestSubscriber<String> subscriber2 = new TestSubscriber<>();
    events.subscribe(subscriber1);
    events.subscribe(subscriber2);
    // then
    assertThat(subscriber1.getOnNextEvents()).hasSize(1);
    assertThat(subscriber2.getOnNextEvents()).isEmpty();
}
项目:RxCache    文件:RxCacheTest.java   
@Before
public void setUp() {
  fetchFails = new AtomicBoolean(false);
  fetchError = new RuntimeException("Fail!");

  scheduler = new TestScheduler();
  producer = new Func0<Single<Long>>() {
    @Override public Single<Long> call() {
      if (fetchFails.get()) {
        return Single.error(fetchError);
      } else {
        return Single.just(scheduler.now());
      }
    }
  };

  cache = new RxCache<>(EXPIRY, scheduler, Single.defer(
      new Func0<Single<Long>>() {
        @Override public Single<Long> call() {
          return producer.call();
        }
      }));

  subscriber = new TestSubscriber<>();
}
项目:android-oss    文件:SearchViewModelTest.java   
@Test
public void testNoResults() {
  final TestScheduler scheduler = new TestScheduler();

  final List<Project> projects = Arrays.asList(
  );

  final MockApiClient apiClient = new MockApiClient() {
    @Override public @NonNull Observable<DiscoverEnvelope> fetchProjects(final @NonNull DiscoveryParams params) {
      return Observable.just(DiscoverEnvelopeFactory.discoverEnvelope(projects));
    }
  };

  final Environment env = environment().toBuilder()
    .scheduler(scheduler)
    .apiClient(apiClient)
    .build();

  setUpEnvironment(env);

  // populate search and overcome debounce
  this.vm.inputs.search("__");
  scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

  this.searchProjects.assertValueCount(2);
}
项目:RxTestWrapper    文件:RxTestWrapperTest.java   
@Test
public void testWithScheduler() throws Exception {
    TestScheduler scheduler = new TestScheduler();

    RxTestWrapper<Long> assertion =
          RxTestWrapper.assertThat(Observable.interval(1, TimeUnit.MILLISECONDS, scheduler).take(3));
    assertion
          .notCompleted()
          .hasNoValues()
          .hasNoErrors();

    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    assertion.hasValueCount(1);

    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    assertion.hasValueCount(2);

    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    assertion.hasValueCount(3)
          .completed();
}
项目:satellite    文件:RestartableSetTest.java   
@Override
public Subscription invoke(DeliveryMethod method, TestSubscriber<Notification<Long>> testSubscriber, final TestScheduler scheduler, RestartableSet set) {
    return set
        .channel(RESTARTABLE_ID, method, new ObservableFactory<String, Long>() {
            @Override
            public Observable<Long> call(final String a) {
                return interval(1, 1, TimeUnit.SECONDS, scheduler).map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong + Long.parseLong(a);
                    }
                });
            }
        })
        .subscribe(testSubscriber);
}
项目:RxJavaFlow    文件:ObservableTests.java   
@Test
public void testStartWithWithScheduler() {
    TestScheduler scheduler = new TestScheduler();
    Observable<Integer> observable = Observable.just(3, 4).startWith(Arrays.asList(1, 2)).subscribeOn(scheduler);

    @SuppressWarnings("unchecked")
    Observer<Integer> observer = mock(Observer.class);
    observable.subscribe(observer);

    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    InOrder inOrder = inOrder(observer);
    inOrder.verify(observer, times(1)).onNext(1);
    inOrder.verify(observer, times(1)).onNext(2);
    inOrder.verify(observer, times(1)).onNext(3);
    inOrder.verify(observer, times(1)).onNext(4);
    inOrder.verify(observer, times(1)).onComplete();
    inOrder.verifyNoMoreInteractions();
}
项目:RxJavaFlow    文件:ObservableTests.java   
@Test
public void testRangeWithScheduler() {
    TestScheduler scheduler = new TestScheduler();
    Observable<Integer> observable = Observable.range(3, 4, scheduler);

    @SuppressWarnings("unchecked")
    Observer<Integer> observer = mock(Observer.class);
    observable.subscribe(observer);

    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    InOrder inOrder = inOrder(observer);
    inOrder.verify(observer, times(1)).onNext(3);
    inOrder.verify(observer, times(1)).onNext(4);
    inOrder.verify(observer, times(1)).onNext(5);
    inOrder.verify(observer, times(1)).onNext(6);
    inOrder.verify(observer, times(1)).onComplete();
    inOrder.verifyNoMoreInteractions();
}
项目:RxJavaFlow    文件:BlockingOperatorLatestTest.java   
@Test(timeout = 1000)
public void testSimple() {
    TestScheduler scheduler = new TestScheduler();

    BlockingObservable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlocking();

    Iterable<Long> iter = source.latest();

    Iterator<Long> it = iter.iterator();

    // only 9 because take(10) will immediately call onComplete() when receiving the 10th item
    // which onComplete() will overwrite the previous value
    for (int i = 0; i < 9; i++) {
        scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

        Assert.assertEquals(true, it.hasNext());

        Assert.assertEquals(Long.valueOf(i), it.next());
    }

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(false, it.hasNext());
}
项目:RxJavaFlow    文件:BlockingOperatorLatestTest.java   
@Test(timeout = 1000)
public void testSameSourceMultipleIterators() {
    TestScheduler scheduler = new TestScheduler();

    BlockingObservable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlocking();

    Iterable<Long> iter = source.latest();

    for (int j = 0; j < 3; j++) {
        Iterator<Long> it = iter.iterator();

        // only 9 because take(10) will immediately call onComplete() when receiving the 10th item
        // which onComplete() will overwrite the previous value
        for (int i = 0; i < 9; i++) {
            scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

            Assert.assertEquals(true, it.hasNext());

            Assert.assertEquals(Long.valueOf(i), it.next());
        }

        scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
        Assert.assertEquals(false, it.hasNext());
    }
}
项目:RxJavaFlow    文件:BlockingOperatorLatestTest.java   
@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testSimpleJustNext() {
    TestScheduler scheduler = new TestScheduler();

    BlockingObservable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlocking();

    Iterable<Long> iter = source.latest();

    Iterator<Long> it = iter.iterator();

    // only 9 because take(10) will immediately call onComplete() when receiving the 10th item
    // which onComplete() will overwrite the previous value
    for (int i = 0; i < 10; i++) {
        scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

        Assert.assertEquals(Long.valueOf(i), it.next());
    }
}
项目:RxJavaFlow    文件:OperatorSwitchTest.java   
@Before
@SuppressWarnings("unchecked")
public void before() {
    scheduler = new TestScheduler();
    innerScheduler = scheduler.createWorker();
    observer = mock(Observer.class);
}
项目:RxJavaFlow    文件:OperatorSkipLastTimedTest.java   
@Test
public void testSkipLastTimed() {
    TestScheduler scheduler = new TestScheduler();

    PublishSubject<Integer> source = PublishSubject.create();

    Observable<Integer> result = source.skipLast(1, TimeUnit.SECONDS, scheduler);

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);

    result.subscribe(o);

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

    source.onNext(4);
    source.onNext(5);
    source.onNext(6);

    scheduler.advanceTimeBy(950, TimeUnit.MILLISECONDS);
    source.onComplete();

    InOrder inOrder = inOrder(o);
    inOrder.verify(o).onNext(1);
    inOrder.verify(o).onNext(2);
    inOrder.verify(o).onNext(3);
    inOrder.verify(o, never()).onNext(4);
    inOrder.verify(o, never()).onNext(5);
    inOrder.verify(o, never()).onNext(6);
    inOrder.verify(o).onComplete();
    inOrder.verifyNoMoreInteractions();

    verify(o, never()).onError(any(Throwable.class));
}
项目:RxJavaFlow    文件:OperatorSkipLastTimedTest.java   
@Test
public void testSkipLastTimedWhenAllElementsAreValid() {
    TestScheduler scheduler = new TestScheduler();

    PublishSubject<Integer> source = PublishSubject.create();

    Observable<Integer> result = source.skipLast(1, TimeUnit.MILLISECONDS, scheduler);

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);

    result.subscribe(o);

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

    source.onComplete();

    InOrder inOrder = inOrder(o);
    inOrder.verify(o).onNext(1);
    inOrder.verify(o).onNext(2);
    inOrder.verify(o).onNext(3);
    inOrder.verify(o).onComplete();
    inOrder.verifyNoMoreInteractions();
}
项目:RxJavaFlow    文件:OperatorTakeTimedTest.java   
@Test
public void testTakeTimed() {
    TestScheduler scheduler = new TestScheduler();

    PublishSubject<Integer> source = PublishSubject.create();

    Observable<Integer> result = source.take(1, TimeUnit.SECONDS, scheduler);

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);

    result.subscribe(o);

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    source.onNext(4);

    InOrder inOrder = inOrder(o);
    inOrder.verify(o).onNext(1);
    inOrder.verify(o).onNext(2);
    inOrder.verify(o).onNext(3);
    inOrder.verify(o).onComplete();
    inOrder.verifyNoMoreInteractions();

    verify(o, never()).onNext(4);
    verify(o, never()).onError(any(Throwable.class));
}