@Test public void testProcessors_output() { String uuid = UUID.randomUUID().toString(); when(cmd.getMarker()).thenReturn(uuid); ReplayProcessor<String> processor = ReplayProcessor.create(); when(cmd.getOutputProcessor()).thenReturn(processor); TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test(); publisher.onNext("some-output"); publisher.onNext(uuid + " 255"); processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-output"); OutputHarvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0); assertThat(crop.exitCode, is(255)); assertThat(crop.buffer, is(nullValue())); }
@Test public void testProcessors_errors() { String uuid = UUID.randomUUID().toString(); when(cmd.getMarker()).thenReturn(uuid); ReplayProcessor<String> processor = ReplayProcessor.create(); when(cmd.getErrorProcessor()).thenReturn(processor); TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test(); publisher.onNext("some-errors"); publisher.onNext(uuid); processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-errors"); Harvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0); assertThat(crop.buffer, is(nullValue())); }
@Test public void shouldHandleAddCommands() { // given: final UUID uuid1 = UUID.randomUUID(); final UUID uuid2 = UUID.randomUUID(); final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final USet<UUID> set = new USet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1); final USet.AddCommand<UUID> command2 = new USet.AddCommand<>(set.getCrdtId(), uuid2); // when: inputStream.onNext(command1); inputStream.onNext(command2); // then: assertThat(set, hasSize(2)); assertThat(subscriber.valueCount(), is(2)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void shouldHandleRemoveCommands() { // given: final UUID uuid1 = UUID.randomUUID(); final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final USet<UUID> set = new USet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1); final USet.RemoveCommand<UUID> command2 = new USet.RemoveCommand<>(set.getCrdtId(), uuid1); // when: inputStream.onNext(command1); inputStream.onNext(command2); // then: assertThat(set, empty()); assertThat(subscriber.valueCount(), is(2)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void itShouldOverwriteOnlyPartialCommandsFromReceivedCommand() { // given final TestSubscriber<MVRegister.SetCommand<String>> outCommands1 = TestSubscriber.create(); final Processor<MVRegister.SetCommand<String>, MVRegister.SetCommand<String>> inCommands2 = ReplayProcessor.create(); final MVRegister<String> register1 = new MVRegister<>(NODE_ID_1, CRDT_ID); register1.subscribe(outCommands1); final MVRegister<String> register2 = new MVRegister<>(NODE_ID_2, CRDT_ID); register2.subscribeTo(inCommands2); register1.set("Hello World"); register2.set("Goodbye World"); inCommands2.onNext(outCommands1.values().get(0)); // when register1.set("42"); inCommands2.onNext(outCommands1.values().get(1)); // then assertThat(register1.get(), containsInAnyOrder("42")); assertThat(register2.get(), containsInAnyOrder("42", "Goodbye World")); }
@Test public void shouldHandleAddCommands() { // given: final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final ORSet<String> set = new ORSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID())); final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("2", UUID.randomUUID())); final ORSet.AddCommand<String> command3 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID())); // when: inputStream.onNext(command1); inputStream.onNext(command2); inputStream.onNext(command3); // then: assertThat(set, hasSize(2)); assertThat(subscriber.valueCount(), is(3)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void shouldHandleDuplicateCommands() { // given: final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final ORSet<String> set = new ORSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final ORSet.AddCommand<String> command = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID())); // when: inputStream.onNext(command); inputStream.onNext(command); // then: assertThat(set, hasSize(1)); assertThat(subscriber.valueCount(), is(1)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void shouldHandleAddCommands() { // given: final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final GSet<String> set = new GSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final GSet.AddCommand<String> command1 = new GSet.AddCommand<>(set.getCrdtId(), "1"); final GSet.AddCommand<String> command2 = new GSet.AddCommand<>(set.getCrdtId(), "2"); final GSet.AddCommand<String> command3 = new GSet.AddCommand<>(set.getCrdtId(), "1"); // when: inputStream.onNext(command1); inputStream.onNext(command2); inputStream.onNext(command3); // then: assertThat(set, hasSize(2)); assertThat(subscriber.valueCount(), is(2)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void shouldHandleDuplicateCommands() { // given: final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final GSet<String> set = new GSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final GSet.AddCommand<String> command = new GSet.AddCommand<>(set.getCrdtId(), "1"); // when: inputStream.onNext(command); inputStream.onNext(command); // then: assertThat(set, hasSize(1)); assertThat(subscriber.valueCount(), is(1)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void shouldHandleAddCommands() { // given: final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final TwoPSet<String> set = new TwoPSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1"); final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "2"); final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1"); // when: inputStream.onNext(command1); inputStream.onNext(command2); inputStream.onNext(command3); // then: assertThat(set, hasSize(2)); assertThat(subscriber.valueCount(), is(2)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void shouldHandleRemoveCommands() { // given: final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final TwoPSet<String> set = new TwoPSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1"); final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1"); final TwoPSet.RemoveCommand<String> command3 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1"); // when: inputStream.onNext(command1); inputStream.onNext(command2); inputStream.onNext(command3); // then: assertThat(set, empty()); assertThat(subscriber.valueCount(), is(2)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void shouldHandleRemoveCommandArrivesBeforeAddCommand() { // given: final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final TwoPSet<String> set = new TwoPSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final TwoPSet.RemoveCommand<String> command1 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1"); final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1"); final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1"); // when: inputStream.onNext(command1); inputStream.onNext(command2); inputStream.onNext(command3); // then: assertThat(set, empty()); assertThat(subscriber.valueCount(), is(1)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
@Test public void itShouldIgnoreOlderValueFromReceivedCommands() { // given final TestSubscriber<MVRegister.SetCommand<String>> outCommands1 = TestSubscriber.create(); final TestSubscriber<MVRegister.SetCommand<String>> outCommands2 = TestSubscriber.create(); final Processor<MVRegister.SetCommand<String>, MVRegister.SetCommand<String>> inCommands3 = ReplayProcessor.create(); final MVRegister<String> register1 = new MVRegister<>(NODE_ID_1, CRDT_ID); register1.subscribe(outCommands1); final MVRegister<String> register2 = new MVRegister<>(NODE_ID_2, CRDT_ID); register2.subscribe(outCommands2); register1.subscribeTo(register2); register2.subscribeTo(register1); final MVRegister<String> register3 = new MVRegister<>(NODE_ID_3, CRDT_ID); register3.subscribeTo(inCommands3); // when register1.set("Hello World"); register2.set("Goodbye World"); final MVRegister.SetCommand<String> oldCommand = outCommands1.values().get(0); final MVRegister.SetCommand<String> newCommand = outCommands2.values().get(1); inCommands3.onNext(newCommand); inCommands3.onNext(oldCommand); // then assertThat(register3.get(), contains("Goodbye World")); }
@SuppressWarnings("unchecked") @Test public void itShouldIgnoreOlderValueFromReceivedCommands() { // given final TestSubscriber<LWWRegister.SetCommand<String>> outCommands1 = TestSubscriber.create(); final TestSubscriber<LWWRegister.SetCommand<String>> outCommands2 = TestSubscriber.create(); final Processor<LWWRegister.SetCommand<String>, LWWRegister.SetCommand<String>> inCommands3 = ReplayProcessor.create(); final LWWRegister<String> register1 = new LWWRegister<>(NODE_ID_1, CRDT_ID); register1.subscribe(outCommands1); final LWWRegister<String> register2 = new LWWRegister<>(NODE_ID_2, CRDT_ID); register2.subscribe(outCommands2); register1.subscribeTo(register2); register2.subscribeTo(register1); final LWWRegister<String> register3 = new LWWRegister<>(NODE_ID_3, CRDT_ID); register3.subscribeTo(inCommands3); // when register1.set("Hello World"); register2.set("Goodbye World"); final LWWRegister.SetCommand<String> oldCommand = outCommands1.values().get(0); final LWWRegister.SetCommand<String> newCommand = outCommands2.values().get(1); inCommands3.onNext(newCommand); inCommands3.onNext(oldCommand); // then assertThat(register3.get(), is("Goodbye World")); }
@Test public void shouldHandleRemoveCommands() { // given: final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create(); final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create(); final ORSet<String> set = new ORSet<>("ID_1"); set.subscribeTo(inputStream); set.subscribe(subscriber); final ORSet.Element<String> elem1 = new ORSet.Element<>("1", UUID.randomUUID()); final ORSet.Element<String> elem2 = new ORSet.Element<>("1", UUID.randomUUID()); final Set<ORSet.Element<String>> elements = new HashSet<>(Arrays.asList(elem1, elem2)); final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), elem1); final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), elem2); final ORSet.RemoveCommand<String> command3 = new ORSet.RemoveCommand<>(set.getCrdtId(), elements); // when: inputStream.onNext(command1); inputStream.onNext(command2); inputStream.onNext(command3); // then: assertThat(set, empty()); assertThat(subscriber.valueCount(), is(3)); subscriber.assertNotComplete(); subscriber.assertNoErrors(); }
public MVRegister(String nodeId, String crdtId) { super(nodeId, crdtId, ReplayProcessor.create()); }
@Test public void itShouldAddElementsConcurrently() { int i1 = 0; int i2 = 0; // given final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands1 = ReplayProcessor.create(); final TestSubscriber<RGA.RGACommand<String>> outCommands1 = TestSubscriber.create(); final RGA<String> rga1 = new RGA<>(NODE_ID_1, CRDT_ID); rga1.subscribeTo(inCommands1); rga1.subscribe(outCommands1); final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands2 = ReplayProcessor.create(); final TestSubscriber<RGA.RGACommand<String>> outCommands2 = TestSubscriber.create(); final RGA<String> rga2 = new RGA<>(NODE_ID_2, CRDT_ID); rga2.subscribeTo(inCommands2); rga2.subscribe(outCommands2); // when rga1.add(0, "A1"); rga2.add(0, "A2"); inCommands2.onNext(outCommands1.values().get(i1)); inCommands1.onNext(outCommands2.values().get(i2)); // then assertThat(rga1, contains("A2", "A1")); assertThat(rga2, contains("A2", "A1")); // when rga1.add(0, "B1"); rga2.add(0, "B2"); inCommands2.onNext(outCommands1.values().get(i1+=2)); inCommands1.onNext(outCommands2.values().get(i2+=2)); // then assertThat(rga1, contains("B2", "B1", "A2", "A1")); assertThat(rga2, contains("B2", "B1", "A2", "A1")); // when rga1.add(1, "C1"); rga2.add(1, "C2"); inCommands2.onNext(outCommands1.values().get(i1+=2)); inCommands1.onNext(outCommands2.values().get(i2+=2)); // then assertThat(rga1, contains("B2", "C2", "C1", "B1", "A2", "A1")); assertThat(rga2, contains("B2", "C2", "C1", "B1", "A2", "A1")); // when rga1.add(6, "D1"); rga2.add(6, "D2"); inCommands2.onNext(outCommands1.values().get(i1 + 2)); inCommands1.onNext(outCommands2.values().get(i2 + 2)); // then assertThat(rga1, contains("B2", "C2", "C1", "B1", "A2", "A1", "D2", "D1")); assertThat(rga2, contains("B2", "C2", "C1", "B1", "A2", "A1", "D2", "D1")); }
@Test public void itShouldAddAndRemoveSingleElementConcurrently() { int i1 = 0; // given final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands1 = ReplayProcessor.create(); final TestSubscriber<RGA.RGACommand<String>> outCommands1 = TestSubscriber.create(); final RGA<String> rga1 = new RGA<>(NODE_ID_1, CRDT_ID); rga1.subscribeTo(inCommands1); rga1.subscribe(outCommands1); final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands2 = ReplayProcessor.create(); final TestSubscriber<RGA.RGACommand<String>> outCommands2 = TestSubscriber.create(); final RGA<String> rga2 = new RGA<>(NODE_ID_2, CRDT_ID); rga2.subscribeTo(inCommands2); rga2.subscribe(outCommands2); rga1.add("A"); inCommands2.onNext(outCommands1.values().get(i1)); int i2 = i1; // when rga1.remove(0); rga2.add(0, "B"); inCommands2.onNext(outCommands1.values().get(++i1)); inCommands1.onNext(outCommands2.values().get(++i2)); // then assertThat(rga1, contains("B")); assertThat(rga2, contains("B")); // when rga1.remove(0); rga2.add(1, "C"); inCommands2.onNext(outCommands1.values().get(i1 + 2)); inCommands1.onNext(outCommands2.values().get(i2 + 2)); // then assertThat(rga1, contains("C")); assertThat(rga2, contains("C")); }
SimpleCrdt(String nodeId, String crdtId) { super(nodeId, crdtId, ReplayProcessor.create()); }
protected GoogleApiProcessor() { this(ReplayProcessor.create()); }
private GoogleApiProcessor(ReplayProcessor<GoogleApiProcessor.Event> processor) { this.processor = processor; }
public static RxJava2ProcProxy replayProcessorProxy() { return new RxJava2ProcProxy(ReplayProcessor.create(), Roxy.TePolicy.PASS); }
public static RxJava2ProcProxy serializedReplayProcessorProxy() { return new RxJava2ProcProxy(ReplayProcessor.create().toSerialized(), Roxy.TePolicy.PASS); }
public static RxJava2ProcProxy safeReplayProcessorProxy() { return new RxJava2ProcProxy(ReplayProcessor.create(), Roxy.TePolicy.WRAP); }
public static RxJava2ProcProxy safeSerializedReplayProcessorProxy() { return new RxJava2ProcProxy(ReplayProcessor.create().toSerialized(), Roxy.TePolicy.WRAP); }