@Test public void completeTask_retrievedTaskIsComplete() { // Given a new task in the persistent repository final Task newTask = new Task(TITLE, ""); mLocalDataSource.saveTask(newTask); // When completed in the persistent repository mLocalDataSource.completeTask(newTask); // Then the task can be retrieved from the persistent repository and is complete TestSubscriber<Optional<Task>> testSubscriber = new TestSubscriber<>(); mLocalDataSource.getTask(newTask.getId()).subscribe(testSubscriber); testSubscriber.assertValueCount(1); Task result = testSubscriber.values().get(0).get(); assertThat(result.isCompleted(), is(true)); }
@Test public void clearCompletedTask_taskNotRetrievable() { // Given 2 new completed tasks and 1 active task in the persistent repository final Task newTask1 = new Task(TITLE, ""); mLocalDataSource.saveTask(newTask1); mLocalDataSource.completeTask(newTask1); final Task newTask2 = new Task(TITLE2, ""); mLocalDataSource.saveTask(newTask2); mLocalDataSource.completeTask(newTask2); final Task newTask3 = new Task(TITLE3, ""); mLocalDataSource.saveTask(newTask3); // When completed tasks are cleared in the repository mLocalDataSource.clearCompletedTasks(); // Then the completed tasks cannot be retrieved and the active one can TestSubscriber<List<Task>> testSubscriber = new TestSubscriber<>(); mLocalDataSource.getTasks().subscribe(testSubscriber); List<Task> result = testSubscriber.values().get(0); assertThat(result, not(hasItems(newTask1, newTask2))); }
@Test public void testInsertWithError(){ models.clear(); testStore.shouldThrowError(true); // enable error TestModel model = new TestModel(99); TestSubscriber<Optional<TestModel>> observer = new TestSubscriber<>(); disposables.add(testStore.insert(model) .subscribeOn(Schedulers.io()) .subscribeWith(observer)); observer.awaitTerminalEvent(2, SECONDS); observer.assertError(Throwable.class); observer.assertErrorMessage("insertSingle.error"); testStore.shouldThrowError(false); // disable error Assert.assertEquals(0, models.size()); }
@Test public void testATestScheduler() throws Exception { TestSubscriber<Long> testSubscriber = new TestSubscriber<>(); TestScheduler testScheduler = new TestScheduler(); Flowable.interval(5, TimeUnit.MILLISECONDS, testScheduler) .map(x -> x + 1) .filter(x -> x % 2 == 0) .subscribe(testSubscriber); testScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); testSubscriber.assertNoErrors(); testSubscriber.assertValues(2L); testScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); testSubscriber.assertNoErrors(); testSubscriber.assertValues(2L, 4L); }
@Test public void getTasks_repositoryCachesAfterFirstSubscription_whenTasksAvailableInLocalStorage() { // Given that the local data source has data available setTasksAvailable(mTasksLocalDataSource, TASKS); // And the remote data source does not have any data available setTasksNotAvailable(mTasksRemoteDataSource); // When two subscriptions are set TestSubscriber<List<Task>> testSubscriber1 = new TestSubscriber<>(); mTasksRepository.getTasks().subscribe(testSubscriber1); TestSubscriber<List<Task>> testSubscriber2 = new TestSubscriber<>(); mTasksRepository.getTasks().subscribe(testSubscriber2); // Then tasks were only requested once from remote and local sources verify(mTasksRemoteDataSource).getTasks(); verify(mTasksLocalDataSource).getTasks(); // assertFalse(mTasksRepository.mCacheIsDirty); testSubscriber1.assertValue(TASKS); testSubscriber2.assertValue(TASKS); }
@Test public void getTasks_repositoryCachesAfterFirstSubscription_whenTasksAvailableInRemoteStorage() { // Given that the local data source has data available setTasksAvailable(mTasksRemoteDataSource, TASKS); // And the remote data source does not have any data available setTasksNotAvailable(mTasksLocalDataSource); // When two subscriptions are set TestSubscriber<List<Task>> testSubscriber1 = new TestSubscriber<>(); mTasksRepository.getTasks().subscribe(testSubscriber1); TestSubscriber<List<Task>> testSubscriber2 = new TestSubscriber<>(); mTasksRepository.getTasks().subscribe(testSubscriber2); // Then tasks were only requested once from remote and local sources verify(mTasksRemoteDataSource).getTasks(); verify(mTasksLocalDataSource).getTasks(); assertFalse(mTasksRepository.mCacheIsDirty); testSubscriber1.assertValue(TASKS); testSubscriber2.assertValue(TASKS); }
@Test public void getTask_requestsSingleTaskFromLocalDataSource() { // Given a stub completed task with title and description in the local repository Task task = new Task(TASK_TITLE, "Some Task Description", true); Optional<Task> taskOptional = Optional.of(task); setTaskAvailable(mTasksLocalDataSource, taskOptional); // And the task not available in the remote repository setTaskNotAvailable(mTasksRemoteDataSource, taskOptional.get().getId()); // When a task is requested from the tasks repository TestSubscriber<Optional<Task>> testSubscriber = new TestSubscriber<>(); mTasksRepository.getTask(task.getId()).subscribe(testSubscriber); // Then the task is loaded from the database verify(mTasksLocalDataSource).getTask(eq(task.getId())); testSubscriber.assertValue(taskOptional); }
@Test public void shouldHandleDuplicateCommands() { // given: final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final ORSet<String> set = new ORSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final ORSet.AddCommand<String> command = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID())); // when: inputStream.onNext(command); inputStream.onNext(command); // then: assertThat(set, hasSize(1)); assertThat(subscriber.valueCount(), is(1)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void shouldHandleAddCommands() { // given: final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final TwoPSet<String> set = new TwoPSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1"); final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "2"); final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1"); // when: inputStream.onNext(command1); inputStream.onNext(command2); inputStream.onNext(command3); // then: assertThat(set, hasSize(2)); assertThat(subscriber.valueCount(), is(2)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void testOneGist() { ServiceLocator.put(OkHttpClient.class, OkHttpClientUtil.getOkHttpClient(null, MockBehavior.MOCK)); Flowable<Gist> flowable = ServiceInjector.resolve(RxEndpoints.class).getGist("3d7cbc2f66cf5d61b8014d957a270c7c"); TestSubscriber<Gist> testSubscriber = new TestSubscriber<>(); flowable.subscribe(testSubscriber); testSubscriber.assertComplete(); List<Gist> gistList = testSubscriber.values(); Gist gist = gistList.get(0); assertEquals("Bootstrap Customizer Config", gist.getDescription()); GistFile file = gist.getFile(gist.getFilenames().iterator().next()); assertEquals(file.getContent().length(), file.getSize()); assertEquals("config.json", file.getFilename()); flowable = ServiceInjector.resolve(RxEndpoints.class).getGist("not actually an ID"); testSubscriber = new TestSubscriber<>(); flowable.subscribe(testSubscriber); testSubscriber.assertNotComplete(); testSubscriber.assertNoValues(); List<Throwable> errorList = testSubscriber.errors(); assertEquals(errorList.size(), 1); assertEquals("Not Found", errorList.get(0).getMessage()); }
@Test public void clientCanCancelServerStreamImplicitly() throws InterruptedException { RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel); TestSubscriber<NumberProto.Number> subscription = stub .responsePressure(Single.just(Empty.getDefaultInstance())) .doOnNext(number -> System.out.println(number.getNumber(0))) .doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnComplete(() -> System.out.println("Completed")) .doOnCancel(() -> System.out.println("Client canceled")) .take(10) .test(); // Consume some work Thread.sleep(TimeUnit.SECONDS.toMillis(1)); subscription.dispose(); subscription.awaitTerminalEvent(3, TimeUnit.SECONDS); subscription.assertValueCount(10); subscription.assertTerminated(); assertThat(svc.wasCanceled()).isTrue(); }
@Test public void testBasicTransform() { TestSubscriber<Update<Integer>> testSubscriber = new TestSubscriber<>(); SimpleFlowableList<Integer> list = new SimpleFlowableList<>(Arrays.asList(1, 2, 3)); FlowableList<Integer> transformedList = list.map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) { return integer + 12; } }); transformedList.updates().subscribe(testSubscriber); testSubscriber.assertValueCount(1); List<Update<Integer>> onNextEvents = testSubscriber.values(); assertEquals(Arrays.asList(Change.reloaded()), onNextEvents.get(0).changes); assertEquals(Arrays.asList(13, 14, 15), onNextEvents.get(0).list); }
@Test public void serverToClientBackpressure() throws InterruptedException { RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel); Single<Empty> rxRequest = Single.just(Empty.getDefaultInstance()); TestSubscriber<NumberProto.Number> rxResponse = stub.responsePressure(rxRequest) .doOnNext(n -> System.out.println(n.getNumber(0) + " <--")) .doOnNext(n -> waitIfValuesAreEqual(n.getNumber(0), 3)) .test(); rxResponse.awaitTerminalEvent(5, TimeUnit.SECONDS); rxResponse.assertComplete() .assertValueCount(NUMBER_OF_STREAM_ELEMENTS); assertThat(numberOfWaits.get()).isEqualTo(1); }
@Test public void bidiRequestBackpressure() throws InterruptedException { RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel); Flowable<NumberProto.Number> rxRequest = Flowable .fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator) .doOnNext(i -> System.out.println(i + " --> ")) .doOnNext(i -> updateNumberOfWaits(lastValueTime, numberOfWaits)) .map(BackpressureIntegrationTest::protoNum); TestSubscriber<NumberProto.Number> rxResponse = stub.twoWayRequestPressure(rxRequest).test(); rxResponse.awaitTerminalEvent(5, TimeUnit.SECONDS); rxResponse.assertComplete() .assertValue(v -> v.getNumber(0) == NUMBER_OF_STREAM_ELEMENTS - 1); assertThat(numberOfWaits.get()).isEqualTo(1); }
@Test public void createGist() throws IOException { ServiceLocator.put(OkHttpClient.class, OkHttpClientUtil.getOkHttpClient(null, MockBehavior.MOCK_ONLY)); Gist gist = new GistImpl(); gist.setDescription(CREATE_DESCRIPTION); gist.addFile(CREATE_FILE_NAME, readFromAsset("mocks/javaclass")); Flowable<Gist> flowable = ServiceInjector.resolve(RxEndpoints.class).createGist(gist); TestSubscriber<Gist> testSubscriber = new TestSubscriber<>(); flowable.subscribe(testSubscriber); testSubscriber.assertComplete(); List<Gist> gistList = testSubscriber.values(); Gist resultGist = gistList.get(0); Flowable<Gist> gistFlowable = ServiceInjector.resolve(RxEndpoints.class).getGist(resultGist.getId()); TestSubscriber<Gist> gistTestSubscriber = new TestSubscriber<>(); gistFlowable.subscribe(gistTestSubscriber); Gist detailGist = gistTestSubscriber.values().get(0); assertEquals(detailGist.getDescription(), CREATE_DESCRIPTION); }
@Test public void testCommand_callback_sync() throws IOException, InterruptedException { processor.attach(session); int cnt = 100; List<Pair<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>(); for (int j = 0; j < cnt; j++) { List<String> cmds = new ArrayList<>(); for (int i = 0; i < 10; i++) cmds.add("echo " + i); cmds.add("echo " + j); PublishProcessor<String> outputListener = PublishProcessor.create(); TestSubscriber<String> outputObserver = outputListener.doOnEach(stringNotification -> TestHelper.sleep(1)).test(); final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build(); final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test(); testSubscribers.add(new Pair<>(resultObserver, outputObserver)); } for (Pair<TestObserver<Cmd.Result>, TestSubscriber<String>> pair : testSubscribers) { pair.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete(); pair.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11); } }
@Test public void testChangeAndUpdate() { final SimpleFlowableList<Integer> list = new SimpleFlowableList<>(); TestSubscriber<Update<Integer>> test = list.updates().test(); assertEquals("list=[0], changes={reloaded}", test.values().get(0).toString()); list.batch(new Consumer<SimpleFlowableList<Integer>>() { @Override public void accept(SimpleFlowableList<Integer> integerSimpleFlowableList) throws Exception { list.add(1); list.add(2); list.add(3); } }); assertEquals("list=[3], changes={inserted(0), inserted(1), inserted(2)}", test.values().get(1).toString()); list.remove(1); assertEquals("list=[2], changes={removed(1)}", test.values().get(2).toString()); list.move(0, 1); assertEquals("list=[2], changes={moved(0 -> 1)}", test.values().get(3).toString()); }
@Test public void testUpstreamPrematureCompletion_output() { String uuid = UUID.randomUUID().toString(); when(cmd.getMarker()).thenReturn(uuid); when(cmd.isOutputBufferEnabled()).thenReturn(true); TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test(); testSubscriber.assertNotTerminated(); publisher.onNext("some-output"); publisher.onComplete(); OutputHarvester.Crop crop = testSubscriber.assertValueCount(1).assertComplete().values().get(0); assertThat(crop.isComplete, is(false)); assertThat(crop.exitCode, is(Cmd.ExitCode.INITIAL)); assertThat(crop.buffer.size(), is(1)); assertThat(crop.buffer, contains("some-output")); }
@Test public void testProcessors_errors() { String uuid = UUID.randomUUID().toString(); when(cmd.getMarker()).thenReturn(uuid); ReplayProcessor<String> processor = ReplayProcessor.create(); when(cmd.getErrorProcessor()).thenReturn(processor); TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test(); publisher.onNext("some-errors"); publisher.onNext(uuid); processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-errors"); Harvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0); assertThat(crop.buffer, is(nullValue())); }
@Test public void testGetOneObject(){ models.clear(); List<TestModel> list = new ArrayList<>(); list.add(new TestModel(10)); list.add(new TestModel(20)); list.add(new TestModel(30)); memoryStore.insertOrUpdate(list); TestModel toFind = new TestModel(20); TestSubscriber<Optional<TestModel>> observer = new TestSubscriber<>(); disposables.add(testStore.getOne(toFind) .subscribeOn(Schedulers.io()) .subscribeWith(observer)); observer.awaitTerminalEvent(5, SECONDS); observer.assertComplete(); observer.assertNoErrors(); Assert.assertEquals(3, models.size()); TestModel tm = observer.values().get(0).get(); Assert.assertEquals(20, tm.getId()); }
@Test public void test() throws InterruptedException { // The stream id specify which stream we want to obtain IntegerRangeId streamId = new IntegerRangeId(0, 10); // Using the RxJava 2 test subscriber TestSubscriber<Integer> subscriber = TestSubscriber.create(); // The stream id will be discovered (and created by the factory) discovery.discover(streamId).subscribe(subscriber); // Wait for the end of the stream subscriber.await(); List<Integer> values = subscriber.values(); List<Integer> expectedValues = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); assertThat(values).containsExactlyElementsOf(expectedValues); }
@Test public void testBufferingWithNoUpdates() throws InterruptedException { TestScheduler testScheduler = new TestScheduler(); SimpleFlowableList<Integer> list = new SimpleFlowableList<>(); FlowableList<Integer> bufferedList = list.buffer(50, TimeUnit.MILLISECONDS, testScheduler); TestSubscriber testSubscriber = new TestSubscriber(); bufferedList.updates().subscribe(testSubscriber); testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS); testScheduler.triggerActions(); testSubscriber.awaitCount(1); testSubscriber.assertNoErrors(); testSubscriber.assertValueCount(1); testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS); testScheduler.triggerActions(); testSubscriber.awaitCount(1); testSubscriber.dispose(); }
@Test public void testSingleList() { List<FlowableList<Integer>> flowableLists = Arrays.asList(FlowableList.of(1, 2, 3)); FlowableList<?> list = FlowableList.concat(flowableLists); TestSubscriber testSubscriber = new TestSubscriber(); list.updates().subscribe(testSubscriber); testSubscriber.assertValueCount(1); List<Update> onNextEvents = testSubscriber.values(); Update update = onNextEvents.get(0); assertEquals(Change.reloaded(), update.changes.get(0)); assertEquals(Arrays.asList(1, 2, 3), update.list); }
@Test public void testInsertListWithError(){ models.clear(); testStore.shouldThrowError(true); // enable error List<TestModel> list = new ArrayList<>(); list.add(new TestModel(1)); list.add(new TestModel(2)); list.add(new TestModel(3)); TestSubscriber<Optional<List<TestModel>>> observer = new TestSubscriber<>(); disposables.add(testStore.insert(list) .subscribeOn(Schedulers.io()) .subscribeWith(observer)); observer.awaitTerminalEvent(2, SECONDS); observer.assertError(Throwable.class); observer.assertErrorMessage("insert.error"); testStore.shouldThrowError(false); // disable error Assert.assertEquals(0, models.size()); // should have been cleared }
@Test public void test() { PublishProcessor<String> subject = PublishProcessor.create(); Flowable<String> source = subject.hide(); TestSubscriber testSubscriber = new TestSubscriber(); CompositeDisposable composite = new CompositeDisposable(); Disposable disposable = source .compose(DisposableAttach.<String>to(composite)) .subscribeWith(testSubscriber); subject.onNext("Foo"); testSubscriber.assertValue("Foo"); assertTrue(composite.size() == 1); composite.dispose(); assertTrue(composite.size() == 0); assertTrue(composite.isDisposed()); assertTrue(disposable.isDisposed()); assertTrue(testSubscriber.isDisposed()); }
@SuppressWarnings("unchecked") @Test public void shouldSendNotificationForRemoves() { // given: final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final ORSet<String> set = new ORSet<>("ID_1"); set.subscribe(subscriber); set.add("1"); set.add("1"); // when: final Iterator<String> it = set.iterator(); it.next(); it.remove(); // then: subscriber.assertNotComplete(); subscriber.assertNoErrors(); assertThat(subscriber.values(), contains( new AddCommandMatcher<>(set.getCrdtId(), "1"), new AddCommandMatcher<>(set.getCrdtId(), "1"), new RemoveCommandMatcher<>(set.getCrdtId(), "1", "1") )); }
@Test public void testGetOneWithError(){ testStore.shouldThrowError(true); models.clear(); List<TestModel> list = new ArrayList<>(); list.add(new TestModel(10)); list.add(new TestModel(20)); list.add(new TestModel(30)); memoryStore.insertOrUpdate(list); TestSubscriber<Optional<TestModel>> observer = new TestSubscriber<>(); disposables.add(testStore.getOne() .subscribeOn(Schedulers.io()) .subscribeWith(observer)); observer.awaitTerminalEvent(2, SECONDS); observer.assertError(Throwable.class); observer.assertErrorMessage("getOne.error"); testStore.shouldThrowError(false); Assert.assertEquals(3, models.size()); }
@Test public void testUpdatesChecked() throws Exception { RxPaperBook book = RxPaperBook.with("UPDATES_CH", Schedulers.trampoline()); final String key = "hello"; final ComplexObject value = ComplexObject.random(); final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create(); book.observe(key, ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber); updatesSubscriber.assertValueCount(0); book.write(key, value).subscribe(); updatesSubscriber.assertValueCount(1); updatesSubscriber.assertValues(value); final ComplexObject newValue = ComplexObject.random(); book.write(key, newValue).subscribe(); updatesSubscriber.assertValueCount(2); updatesSubscriber.assertValues(value, newValue); // Error value final int wrongValue = 3; book.write(key, wrongValue).test().assertComplete().assertNoErrors(); updatesSubscriber.assertValueCount(2); updatesSubscriber.assertValues(value, newValue); updatesSubscriber.assertNoErrors(); }
@Test public void shouldHandleDuplicateCommands() { // given: final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final GSet<String> set = new GSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final GSet.AddCommand<String> command = new GSet.AddCommand<>(set.getCrdtId(), "1"); // when: inputStream.onNext(command); inputStream.onNext(command); // then: assertThat(set, hasSize(1)); assertThat(subscriber.valueCount(), is(1)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void test_just_Flowable() { Flowable<String> observable = Flowable.just("mango", "papaya", "guava"); TestSubscriber<String> testSubscriber = new TestSubscriber<>(); observable.subscribe(testSubscriber); List<String> items = testSubscriber.values(); testSubscriber.assertComplete(); testSubscriber.assertSubscribed(); testSubscriber.assertNoErrors(); testSubscriber.assertValueCount(3); testSubscriber.assertValues("mango", "papaya", "guava"); }
@SuppressWarnings("unchecked") @Test public void shouldSendNotificationForAdds() { // given: final UUID uuid1 = UUID.randomUUID(); final UUID uuid2 = UUID.randomUUID(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final USet<UUID> set = new USet<>("ID_1"); set.subscribe(subscriber); // when: set.add(uuid1); set.add(uuid2); // then: subscriber.assertNotComplete(); subscriber.assertNoErrors(); assertThat(subscriber.values(), contains( new AddCommandMatcher<>(set.getCrdtId(), uuid1), new AddCommandMatcher<>(set.getCrdtId(), uuid2) )); }
@SuppressWarnings("unchecked") @Test public void shouldSendNotificationForRemoves() { // given: final UUID uuid1 = UUID.randomUUID(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final USet<UUID> set = new USet<>("ID_1"); set.subscribe(subscriber); set.add(uuid1); // when: final Iterator<UUID> it = set.iterator(); it.next(); it.remove(); // then: subscriber.assertNotComplete(); subscriber.assertNoErrors(); assertThat(subscriber.values(), contains( new AddCommandMatcher<>(set.getCrdtId(), uuid1), new RemoveCommandMatcher<>(set.getCrdtId(), uuid1) )); }
@Test public void shouldHandleRemoveCommands() { // given: final UUID uuid1 = UUID.randomUUID(); final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final USet<UUID> set = new USet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1); final USet.RemoveCommand<UUID> command2 = new USet.RemoveCommand<>(set.getCrdtId(), uuid1); // when: inputStream.onNext(command1); inputStream.onNext(command2); // then: assertThat(set, empty()); assertThat(subscriber.valueCount(), is(2)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void testBasicTransform() { BehaviorProcessor<List<Integer>> processor = BehaviorProcessor.create(); FlowableList<Integer> list = FlowableList.diff(processor); TestSubscriber<Update<Integer>> test = list.updates().test(); processor.onNext(Arrays.asList(1, 2, 3, 4)); Update<Integer> firstUpdate = test.values().get(0); assertEquals(Collections.singletonList(Change.reloaded()), firstUpdate.changes); processor.onNext(Arrays.asList(2, 4, 5)); Update<Integer> secondUpdate = test.values().get(1); assertEquals(Arrays.asList(2, 4, 5), secondUpdate.list); assertEquals(Arrays.asList( 2, 4, 5), TestTools.applyChanges(firstUpdate.list, secondUpdate.list, secondUpdate.changes)); }
@Test public void shouldPassMessageToLocalStreamWhenSendMessage() throws Exception { String message = "hello @alex http://youtube.com/q=look (love) there @yui you go http://twitter.com"; final List<Link> expectedLinks = asList( new Link("http://youtube.com/q=look", ""), new Link("http://twitter.com", "")); when(userResolver.getLoggedInUser()).thenReturn(TestUtils.createMockUser()); final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>(); final Observable<Message> messageViewModelMessages = messageViewModel.localMessageStream(); messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber); final Message m = new Message(id, message, Arrays.asList("alex", "yui"), Arrays.asList("love"), expectedLinks, TestUtils.createMockUser()); messageViewModel.sendMessage(message, Arrays.asList("love"), sendScheduler); userTestSubscriber.assertValue(m); }
@Test public void testInsertList(){ models.clear(); List<TestModel> list = new ArrayList<>(); list.add(new TestModel(1)); list.add(new TestModel(2)); list.add(new TestModel(3)); TestSubscriber<Optional<List<TestModel>>> observer = new TestSubscriber<>(); disposables.add(testStore.insert(list) .subscribeOn(Schedulers.io()) .subscribeWith(observer)); observer.awaitTerminalEvent(2, SECONDS); observer.assertComplete(); observer.assertNoErrors(); Assert.assertEquals(3, models.size()); }
@Test public void testItemRemoval() { VisibleItem<Integer> item1 = new VisibleItem<>(1, true); VisibleItem<Integer> item2 = new VisibleItem<>(2, true); VisibleItem<Integer> item3 = new VisibleItem<>(3, true); SimpleFlowableList<VisibleItem<Integer>> simpleList = new SimpleFlowableList<>(); FlowableList<Integer> list = FlowableList.collapseVisibility(simpleList); TestSubscriber testSubscriber = new TestSubscriber(); simpleList.add(item1); simpleList.add(item2); simpleList.add(item3); list.updates().subscribe(testSubscriber); simpleList.remove(1); List<Update> onNextEvents = testSubscriber.values(); testSubscriber.assertValueCount(2); Update update1 = onNextEvents.get(0); Update update2 = onNextEvents.get(1); assertEquals(Arrays.asList(Change.reloaded()), update1.changes); assertEquals(Arrays.asList(1, 2, 3), update1.list); assertEquals(Arrays.asList(Change.removed(1)), update2.changes); assertEquals(Arrays.asList(1, 3), update2.list); }
@Test public void shouldReturnApiStream() throws Exception { final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>(); final Observable<Message> messageViewModelMessages = messageViewModel.apiSendMessageStream(); messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber); userTestSubscriber.assertNoErrors(); userTestSubscriber.assertNoValues(); }
@Test public void loadQuestions_ShouldReturnFromRemoteService() { QuestionResponse questionResponse = new QuestionResponse(); TestSubscriber<List<Question>> subscriber = new TestSubscriber<>(); given(questionService.loadQuestionsByTag(Config.ANDROID_QUESTION_TAG)).willReturn(Flowable.just(questionResponse)); remoteDataSource.loadQuestions(anyBoolean()).subscribe(subscriber); then(questionService).should().loadQuestionsByTag(Config.ANDROID_QUESTION_TAG); }
@Test public void getTasks_retrieveSavedTasks() { // Given 2 new tasks in the persistent repository final Task newTask1 = new Task(TITLE, ""); mLocalDataSource.saveTask(newTask1); final Task newTask2 = new Task(TITLE, ""); mLocalDataSource.saveTask(newTask2); // Then the tasks can be retrieved from the persistent repository TestSubscriber<List<Task>> testSubscriber = new TestSubscriber<>(); mLocalDataSource.getTasks().subscribe(testSubscriber); List<Task> result = testSubscriber.values().get(0); assertThat(result, hasItems(newTask1, newTask2)); }