Java 类io.reactivex.processors.ReplayProcessor 实例源码
项目:RxShell
文件:HarvesterTest.java
@Test
public void testProcessors_output() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
ReplayProcessor<String> processor = ReplayProcessor.create();
when(cmd.getOutputProcessor()).thenReturn(processor);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
publisher.onNext("some-output");
publisher.onNext(uuid + " 255");
processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-output");
OutputHarvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0);
assertThat(crop.exitCode, is(255));
assertThat(crop.buffer, is(nullValue()));
}
项目:RxShell
文件:HarvesterTest.java
@Test
public void testProcessors_errors() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
ReplayProcessor<String> processor = ReplayProcessor.create();
when(cmd.getErrorProcessor()).thenReturn(processor);
TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test();
publisher.onNext("some-errors");
publisher.onNext(uuid);
processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-errors");
Harvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0);
assertThat(crop.buffer, is(nullValue()));
}
项目:wurmloch-crdt
文件:USetTest.java
@Test
public void shouldHandleAddCommands() {
// given:
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
final USet.AddCommand<UUID> command2 = new USet.AddCommand<>(set.getCrdtId(), uuid2);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:USetTest.java
@Test
public void shouldHandleRemoveCommands() {
// given:
final UUID uuid1 = UUID.randomUUID();
final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
final USet.RemoveCommand<UUID> command2 = new USet.RemoveCommand<>(set.getCrdtId(), uuid1);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:MVRegisterTest.java
@Test
public void itShouldOverwriteOnlyPartialCommandsFromReceivedCommand() {
// given
final TestSubscriber<MVRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
final Processor<MVRegister.SetCommand<String>, MVRegister.SetCommand<String>> inCommands2 = ReplayProcessor.create();
final MVRegister<String> register1 = new MVRegister<>(NODE_ID_1, CRDT_ID);
register1.subscribe(outCommands1);
final MVRegister<String> register2 = new MVRegister<>(NODE_ID_2, CRDT_ID);
register2.subscribeTo(inCommands2);
register1.set("Hello World");
register2.set("Goodbye World");
inCommands2.onNext(outCommands1.values().get(0));
// when
register1.set("42");
inCommands2.onNext(outCommands1.values().get(1));
// then
assertThat(register1.get(), containsInAnyOrder("42"));
assertThat(register2.get(), containsInAnyOrder("42", "Goodbye World"));
}
项目:wurmloch-crdt
文件:ORSetTest.java
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("2", UUID.randomUUID()));
final ORSet.AddCommand<String> command3 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(3));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:ORSetTest.java
@Test
public void shouldHandleDuplicateCommands() {
// given:
final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final ORSet.AddCommand<String> command = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
// when:
inputStream.onNext(command);
inputStream.onNext(command);
// then:
assertThat(set, hasSize(1));
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:GSetTest.java
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final GSet.AddCommand<String> command1 = new GSet.AddCommand<>(set.getCrdtId(), "1");
final GSet.AddCommand<String> command2 = new GSet.AddCommand<>(set.getCrdtId(), "2");
final GSet.AddCommand<String> command3 = new GSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:GSetTest.java
@Test
public void shouldHandleDuplicateCommands() {
// given:
final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final GSet.AddCommand<String> command = new GSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command);
inputStream.onNext(command);
// then:
assertThat(set, hasSize(1));
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:TwoPSetTest.java
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "2");
final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:TwoPSetTest.java
@Test
public void shouldHandleRemoveCommands() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.RemoveCommand<String> command3 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:TwoPSetTest.java
@Test
public void shouldHandleRemoveCommandArrivesBeforeAddCommand() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.RemoveCommand<String> command1 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:MVRegisterTest.java
@Test
public void itShouldIgnoreOlderValueFromReceivedCommands() {
// given
final TestSubscriber<MVRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
final TestSubscriber<MVRegister.SetCommand<String>> outCommands2 = TestSubscriber.create();
final Processor<MVRegister.SetCommand<String>, MVRegister.SetCommand<String>> inCommands3 = ReplayProcessor.create();
final MVRegister<String> register1 = new MVRegister<>(NODE_ID_1, CRDT_ID);
register1.subscribe(outCommands1);
final MVRegister<String> register2 = new MVRegister<>(NODE_ID_2, CRDT_ID);
register2.subscribe(outCommands2);
register1.subscribeTo(register2);
register2.subscribeTo(register1);
final MVRegister<String> register3 = new MVRegister<>(NODE_ID_3, CRDT_ID);
register3.subscribeTo(inCommands3);
// when
register1.set("Hello World");
register2.set("Goodbye World");
final MVRegister.SetCommand<String> oldCommand = outCommands1.values().get(0);
final MVRegister.SetCommand<String> newCommand = outCommands2.values().get(1);
inCommands3.onNext(newCommand);
inCommands3.onNext(oldCommand);
// then
assertThat(register3.get(), contains("Goodbye World"));
}
项目:wurmloch-crdt
文件:LWWRegisterTest.java
@SuppressWarnings("unchecked")
@Test
public void itShouldIgnoreOlderValueFromReceivedCommands() {
// given
final TestSubscriber<LWWRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
final TestSubscriber<LWWRegister.SetCommand<String>> outCommands2 = TestSubscriber.create();
final Processor<LWWRegister.SetCommand<String>, LWWRegister.SetCommand<String>> inCommands3 = ReplayProcessor.create();
final LWWRegister<String> register1 = new LWWRegister<>(NODE_ID_1, CRDT_ID);
register1.subscribe(outCommands1);
final LWWRegister<String> register2 = new LWWRegister<>(NODE_ID_2, CRDT_ID);
register2.subscribe(outCommands2);
register1.subscribeTo(register2);
register2.subscribeTo(register1);
final LWWRegister<String> register3 = new LWWRegister<>(NODE_ID_3, CRDT_ID);
register3.subscribeTo(inCommands3);
// when
register1.set("Hello World");
register2.set("Goodbye World");
final LWWRegister.SetCommand<String> oldCommand = outCommands1.values().get(0);
final LWWRegister.SetCommand<String> newCommand = outCommands2.values().get(1);
inCommands3.onNext(newCommand);
inCommands3.onNext(oldCommand);
// then
assertThat(register3.get(), is("Goodbye World"));
}
项目:wurmloch-crdt
文件:ORSetTest.java
@Test
public void shouldHandleRemoveCommands() {
// given:
final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final ORSet.Element<String> elem1 = new ORSet.Element<>("1", UUID.randomUUID());
final ORSet.Element<String> elem2 = new ORSet.Element<>("1", UUID.randomUUID());
final Set<ORSet.Element<String>> elements = new HashSet<>(Arrays.asList(elem1, elem2));
final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), elem1);
final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), elem2);
final ORSet.RemoveCommand<String> command3 = new ORSet.RemoveCommand<>(set.getCrdtId(), elements);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(3));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
项目:wurmloch-crdt
文件:MVRegister.java
public MVRegister(String nodeId, String crdtId) {
super(nodeId, crdtId, ReplayProcessor.create());
}
项目:wurmloch-crdt
文件:RGATest.java
@Test
public void itShouldAddElementsConcurrently() {
int i1 = 0;
int i2 = 0;
// given
final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands1 = ReplayProcessor.create();
final TestSubscriber<RGA.RGACommand<String>> outCommands1 = TestSubscriber.create();
final RGA<String> rga1 = new RGA<>(NODE_ID_1, CRDT_ID);
rga1.subscribeTo(inCommands1);
rga1.subscribe(outCommands1);
final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands2 = ReplayProcessor.create();
final TestSubscriber<RGA.RGACommand<String>> outCommands2 = TestSubscriber.create();
final RGA<String> rga2 = new RGA<>(NODE_ID_2, CRDT_ID);
rga2.subscribeTo(inCommands2);
rga2.subscribe(outCommands2);
// when
rga1.add(0, "A1");
rga2.add(0, "A2");
inCommands2.onNext(outCommands1.values().get(i1));
inCommands1.onNext(outCommands2.values().get(i2));
// then
assertThat(rga1, contains("A2", "A1"));
assertThat(rga2, contains("A2", "A1"));
// when
rga1.add(0, "B1");
rga2.add(0, "B2");
inCommands2.onNext(outCommands1.values().get(i1+=2));
inCommands1.onNext(outCommands2.values().get(i2+=2));
// then
assertThat(rga1, contains("B2", "B1", "A2", "A1"));
assertThat(rga2, contains("B2", "B1", "A2", "A1"));
// when
rga1.add(1, "C1");
rga2.add(1, "C2");
inCommands2.onNext(outCommands1.values().get(i1+=2));
inCommands1.onNext(outCommands2.values().get(i2+=2));
// then
assertThat(rga1, contains("B2", "C2", "C1", "B1", "A2", "A1"));
assertThat(rga2, contains("B2", "C2", "C1", "B1", "A2", "A1"));
// when
rga1.add(6, "D1");
rga2.add(6, "D2");
inCommands2.onNext(outCommands1.values().get(i1 + 2));
inCommands1.onNext(outCommands2.values().get(i2 + 2));
// then
assertThat(rga1, contains("B2", "C2", "C1", "B1", "A2", "A1", "D2", "D1"));
assertThat(rga2, contains("B2", "C2", "C1", "B1", "A2", "A1", "D2", "D1"));
}
项目:wurmloch-crdt
文件:RGATest.java
@Test
public void itShouldAddAndRemoveSingleElementConcurrently() {
int i1 = 0;
// given
final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands1 = ReplayProcessor.create();
final TestSubscriber<RGA.RGACommand<String>> outCommands1 = TestSubscriber.create();
final RGA<String> rga1 = new RGA<>(NODE_ID_1, CRDT_ID);
rga1.subscribeTo(inCommands1);
rga1.subscribe(outCommands1);
final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands2 = ReplayProcessor.create();
final TestSubscriber<RGA.RGACommand<String>> outCommands2 = TestSubscriber.create();
final RGA<String> rga2 = new RGA<>(NODE_ID_2, CRDT_ID);
rga2.subscribeTo(inCommands2);
rga2.subscribe(outCommands2);
rga1.add("A");
inCommands2.onNext(outCommands1.values().get(i1));
int i2 = i1;
// when
rga1.remove(0);
rga2.add(0, "B");
inCommands2.onNext(outCommands1.values().get(++i1));
inCommands1.onNext(outCommands2.values().get(++i2));
// then
assertThat(rga1, contains("B"));
assertThat(rga2, contains("B"));
// when
rga1.remove(0);
rga2.add(1, "C");
inCommands2.onNext(outCommands1.values().get(i1 + 2));
inCommands1.onNext(outCommands2.values().get(i2 + 2));
// then
assertThat(rga1, contains("C"));
assertThat(rga2, contains("C"));
}
项目:wurmloch-crdt
文件:SimpleCrdt.java
SimpleCrdt(String nodeId, String crdtId) {
super(nodeId, crdtId, ReplayProcessor.create());
}
项目:eternity
文件:GoogleApiProcessor.java
protected GoogleApiProcessor() {
this(ReplayProcessor.create());
}
项目:eternity
文件:GoogleApiProcessor.java
private GoogleApiProcessor(ReplayProcessor<GoogleApiProcessor.Event> processor) {
this.processor = processor;
}
项目:RHub
文件:RxJava2Proxies.java
public static RxJava2ProcProxy replayProcessorProxy() {
return new RxJava2ProcProxy(ReplayProcessor.create(), Roxy.TePolicy.PASS);
}
项目:RHub
文件:RxJava2Proxies.java
public static RxJava2ProcProxy serializedReplayProcessorProxy() {
return new RxJava2ProcProxy(ReplayProcessor.create().toSerialized(), Roxy.TePolicy.PASS);
}
项目:RHub
文件:RxJava2Proxies.java
public static RxJava2ProcProxy safeReplayProcessorProxy() {
return new RxJava2ProcProxy(ReplayProcessor.create(), Roxy.TePolicy.WRAP);
}
项目:RHub
文件:RxJava2Proxies.java
public static RxJava2ProcProxy safeSerializedReplayProcessorProxy() {
return new RxJava2ProcProxy(ReplayProcessor.create().toSerialized(), Roxy.TePolicy.WRAP);
}