Java 类io.reactivex.flowables.ConnectableFlowable 实例源码
项目:streamingpool-core
文件:OverlapBufferStreamTest.java
@Test
public void ifStartEmitsOnlyOnceBeforeDataStreamNeverEnds() throws InterruptedException {
CountDownLatch sync = new CountDownLatch(1);
ConnectableFlowable<?> sourceStream = just(0L).publish();
ConnectableFlowable<?> startStream = just(new Object()).publish();
sourceStream.buffer(startStream, opening -> never()).doOnTerminate(sync::countDown)
.subscribe(System.out::println);
sourceStream.connect();
startStream.connect();
sync.await(5, SECONDS);
assertThat(sync.getCount()).isEqualTo(0L);
}
项目:reactivejournal
文件:HelloWorldRemote.java
public static void main(String... args) throws IOException, InterruptedException {
//Create the rxRecorder but don't delete the cache that has been created.
ReactiveJournal reactiveJournal = new ReactiveJournal(HelloWorldApp_JounalAsObserver.FILE_NAME);
//Get the input from the remote process
RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
PlayOptions options = new PlayOptions().filter(HelloWorldApp_JounalAsObserver.INPUT_FILTER)
.playFromNow(true).replayRate(PlayOptions.ReplayRate.FAST);
ConnectableFlowable<Byte> remoteInput = rxPlayer.play(options).publish();
BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
Flowable<String> flowableOutput = bytesToWords.process(remoteInput);
flowableOutput.subscribe(
s->LOG.info("Remote input [{}]", s),
e-> LOG.error("Problem in remote [{}]", e),
()->{
LOG.info("Remote input ended");
System.exit(0);
});
remoteInput.connect();
}
项目:reactivejournal
文件:RxJournalBackPressureTestingFasterConsumer.java
public static void main(String[] args) throws IOException {
ReactiveJournal reactiveJournal = new ReactiveJournal("src/main/java/org/reactivejournal/examples/fastproducerslowconsumer/resources/");
//Get the input from the recorder note that we have to set the replayRate to ACTUAL_TIME
//to replicate the conditions in the 'real world'.
PlayOptions options = new PlayOptions().filter("input").replayRate(ReplayRate.ACTUAL_TIME);
ConnectableFlowable journalInput = new RxJavaPlayer(reactiveJournal).play(options).publish();
//Reduce the latency of the consumer to 5ms - try reducing or increasing to study the effects.
Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(3);
long startTime = System.currentTimeMillis();
journalInput.observeOn(Schedulers.io()).subscribe(onNextSlowConsumer::accept,
e -> System.out.println("ReactiveRecorder " + " " + e),
() -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "ms]")
);
journalInput.connect();
DSUtil.sleep(1000);
}
项目:streamingpool-core
文件:OverlapBufferStreamFactory.java
@SuppressWarnings("unchecked")
@Override
public <T> ErrorStreamPair<T> create(StreamId<T> id, DiscoveryService discoveryService) {
if (!(id instanceof OverlapBufferStreamId)) {
return ErrorStreamPair.empty();
}
OverlapBufferStreamId<?> analysisId = (OverlapBufferStreamId<?>) id;
BufferSpecification bufferSpecification = analysisId.bufferSpecification();
StreamId<?> startId = bufferSpecification.startId();
StreamId<?> sourceId = analysisId.sourceId();
Flowable<?> timeout = bufferSpecification.timeout();
ConnectableFlowable<?> startStream = Flowable.fromPublisher(discoveryService.discover(startId)).publish();
ConnectableFlowable<?> sourceStream = Flowable.fromPublisher(discoveryService.discover(sourceId)).publish();
Set<EndStreamMatcher<?, ?>> matchers = bufferSpecification.endStreamMatchers();
Map<EndStreamMatcher<Object, Object>, ConnectableFlowable<?>> endStreams = matchers.stream()
.collect(Collectors.toMap(m -> (EndStreamMatcher<Object, Object>) m,
m -> Flowable.fromPublisher(discoveryService.discover(m.endStreamId())).publish()));
StreamConnector sourceStreamConnector = new StreamConnector(sourceStream);
Flowable<?> bufferStream = sourceStream
.compose(new DoAfterFirstSubscribe<>(() -> {
endStreams.values().forEach(ConnectableFlowable::connect);
startStream.connect();
}))
.buffer(startStream,
opening -> closingStreamFor(opening, endStreams, timeout, sourceStreamConnector));
return ErrorStreamPair.ofData((Publisher<T>) bufferStream);
}
项目:streamingpool-core
文件:OverlapBufferStreamFactory.java
private Flowable<?> closingStreamFor(Object opening,
Map<EndStreamMatcher<Object, Object>, ConnectableFlowable<?>> endStreams, Flowable<?> timeout,
StreamConnector sourceStreamConnector) {
Set<Flowable<?>> matchingEndStreams = endStreams.entrySet().stream()
.map(e -> e.getValue().filter(v -> e.getKey().matching().test(opening, v))).collect(Collectors.toSet());
matchingEndStreams.add(timeout);
return Flowable.merge(matchingEndStreams)
.compose(new DoAfterFirstSubscribe<>(sourceStreamConnector::connect))
.take(1);
}
项目:reactivejournal
文件:HelloWorldApp_JounalAsObserver.java
public static void main(String[] args) throws IOException {
ConnectableFlowable flowableInput =
Flowable.fromArray(new Byte[]{72,101,108,108,111,32,87,111,114,108,100,32}).map(
i->{
DSUtil.sleep(INTERVAL_MS);
return i;
}).publish();
//Create the reactiveRecorder and delete any previous content by clearing the cache
ReactiveJournal reactiveJournal = new ReactiveJournal(FILE_NAME);
reactiveJournal.clearCache();
//Pass the input stream into the reactiveRecorder which will subscribe to it and record all events.
//The subscription will not be activated until 'connect' is called on the input stream.
ReactiveRecorder reactiveRecorder = reactiveJournal.createReactiveRecorder();
reactiveRecorder.record(flowableInput, INPUT_FILTER);
BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
//Pass the input Byte stream into the BytesToWordsProcessor class which subscribes to the stream and returns
//a stream of words.
//The subscription will not be activated until 'connect' is called on the input stream.
Flowable<String> flowableOutput = bytesToWords.process(flowableInput);
//Pass the output stream (of words) into the reactiveRecorder which will subscribe to it and record all events.
flowableOutput.subscribe(LOG::info);
reactiveRecorder.record(flowableOutput, OUTPUT_FILTER);
//Activate the subscriptions
flowableInput.connect();
reactiveJournal.writeToFile("/tmp/Demo/demo.txt",true);
}
项目:reactivejournal
文件:HelloWorldApp_JournalPlayThrough.java
public static void main(String[] args) throws IOException {
//Create the reactiveRecorder and delete any previous content by clearing the cache
ReactiveJournal reactiveJournal = new ReactiveJournal(FILE_NAME);
reactiveJournal.clearCache();
//Pass the input stream into the reactiveRecorder which will subscribe to it and record all events.
//The subscription will not be activated on a new thread which will allow this program to continue.
ReactiveRecorder reactiveRecorder = reactiveJournal.createReactiveRecorder();
reactiveRecorder.recordAsync(observableInput, INPUT_FILTER);
BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
//Retrieve a stream of
RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
PlayOptions options = new PlayOptions().filter(INPUT_FILTER).playFromNow(true);
ConnectableFlowable recordedObservable = rxPlayer.play(options).publish();
//Pass the input Byte stream into the BytesToWordsProcessor class which subscribes to the stream and returns
//a stream of words.
Flowable<String> flowableOutput = bytesToWords.process(recordedObservable);
//Pass the output stream (of words) into the reactiveRecorder which will subscribe to it and record all events.
reactiveRecorder.record(flowableOutput, OUTPUT_FILTER);
flowableOutput.subscribe(s -> LOG.info("HelloWorldHot->" + s),
throwable -> LOG.error("", throwable),
()->LOG.info("HelloWorldHot Complete"));
//Only start the recording now because we want to make sure that the BytesToWordsProcessor and the reactiveRecorder
//are both setup up to receive subscriptions.
recordedObservable.connect();
//Sometimes useful to see the recording written to a file
reactiveJournal.writeToFile("/tmp/Demo/demo.txt",true);
}
项目:reactivejournal
文件:RxJournalBackPressureLatest.java
public static void main(String[] args) throws IOException {
ReactiveJournal reactiveJournal = new ReactiveJournal("/tmp/fastproducer");
reactiveJournal.clearCache();
Flowable<Long> fastProducer = FastProducerSlowConsumer.createFastProducer(BackpressureStrategy.MISSING, 2500);
ReactiveRecorder recorder = reactiveJournal.createReactiveRecorder();
recorder.recordAsync(fastProducer,"input");
//Set the replay strategy to ReplayRate.FAST as e want to process the event as soon as it is
//received from the publisher.
PlayOptions options = new PlayOptions().filter("input").replayRate(PlayOptions.ReplayRate.FAST);
ConnectableFlowable journalInput = new RxJavaPlayer(reactiveJournal).play(options).publish();
Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(10);
recorder.record(journalInput, "consumed");
long startTime = System.currentTimeMillis();
journalInput.observeOn(Schedulers.io()).subscribe(onNextSlowConsumer::accept,
e -> System.out.println("ReactiveRecorder " + " " + e),
() -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "]")
);
journalInput.connect();
DSUtil.sleep(3000);
}
项目:RxJava2Extensions
文件:FlowableRefCountTimeout.java
FlowableRefCountTimeout(ConnectableFlowable<T> source, int n, long timeout, TimeUnit unit,
Scheduler scheduler) {
this.source = source;
this.n = n;
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
}
项目:RxJava2Extensions
文件:FlowableRefCountTimeout.java
@Override
public Publisher<T> apply(Flowable<T> upstream) {
if (upstream instanceof ConnectableFlowable) {
return new FlowableRefCountTimeout<T>((ConnectableFlowable<T>)upstream, n, timeout, unit, scheduler);
}
throw new IllegalArgumentException("This transformer requires an upstream ConnectableFlowable");
}
项目:Attendance
文件:ApproveListFragment.java
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
String githubToken = Constants.ETEST_API_KEY;
String urlx = "http:\\" + SettingsActivity.getServerName(getActivity());
String usicox = SettingsActivity.getUsIco(getActivity());
if( usicox.equals("44551142")) {
urlx = "http:\\" + Constants.EDCOM_url;
}
_rfetestService = RfEtestService.createGithubService(githubToken, urlx);
_disposables = new CompositeDisposable();
_rxBus = getRxBusSingleton();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof ApproveListFragment.TapEvent) {
///_showTapText();
}
if (event instanceof Attendance) {
String keys = ((Attendance) event).getRok();
//Log.d("In FRGM longClick", keys);
getApproveDialog( keys, (Attendance) event);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:AbsTypesListRxFragment.java
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
String githubToken = Constants.ETEST_API_KEY;
String urlx = SettingsActivity.getServerName(getActivity());
_disposables = new CompositeDisposable();
_rxBus = getRxBusSingleton();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof AbsTypesListRxFragment.TapEvent) {
///_showTapText();
}
if (event instanceof Abstype) {
String keys = ((Abstype) event).getRok();
//Log.d("In FRGM longClick", keys);
getAbsTypesDialog( keys, (Abstype) event);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:RxBusDemo_Bottom3Fragment.java
@Override
public void onStart() {
super.onStart();
_disposables = new CompositeDisposable();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof RxBusDemoFragment.TapEvent) {
_showTapText();
}
if (event instanceof EventRxBus.Message) {
tvContent = (TextView) getActivity().findViewById(R.id.tvContent);
tvContent.setText(((EventRxBus.Message) event).message);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
_showTapCount(taps.size());
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:PostsFragment.java
@Override public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
_rxBus = getRxBusSingleton();
_disposables = new CompositeDisposable();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof PostsFragment.TapEvent) {
///_showTapText();
}
if (event instanceof BlogPostEntity) {
//saveAbsServer(((Attendance) event).daod + " / " + ((Attendance) event).dado, ((Attendance) event));
String keys = ((BlogPostEntity) event).getAuthor();
//blogPostsAdapter.remove(0);
showProgress(true);
Log.d("In FRGM shortClick", keys);
BlogPostEntity postx = new BlogPostEntity(null, null, null);
delBlogPostRx(postx,1, keys);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:StompProtocolAndroid
文件:StompClient.java
public Flowable<Void> send(StompMessage stompMessage) {
Flowable<Void> flowable = mConnectionProvider.send(stompMessage.compile());
if (!mConnected) {
ConnectableFlowable<Void> deferred = flowable.publish();
mWaitConnectionFlowables.add(deferred);
return deferred;
} else {
return flowable;
}
}
项目:akarnokd-misc
文件:DoOnErrorFusion.java
public static void main(String[] args) {
ConnectableFlowable<Integer> f = Flowable.just(1)
.doOnNext(i -> {
throw new IllegalArgumentException();
})
.doOnError(e -> {
throw new IllegalStateException(e);
}).publish();
f.subscribe(
i -> { throw new AssertionError(); },
e -> e.printStackTrace());
f.connect();
}
项目:RxJava-Android-Samples
文件:RxBusDemo_Bottom3Fragment.java
@Override
public void onStart() {
super.onStart();
_disposables = new CompositeDisposable();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables //
.add(
tapEventEmitter.subscribe(
event -> {
if (event instanceof RxBusDemoFragment.TapEvent) {
_showTapText();
}
}));
_disposables.add(
tapEventEmitter
.publish(stream -> stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
taps -> {
_showTapCount(taps.size());
}));
_disposables.add(tapEventEmitter.connect());
}
项目:contentful.java
文件:Callbacks.java
static <O extends CDAResource, C extends CDAResource> CDACallback<C> subscribeAsync(
Flowable<O> flowable, CDACallback<C> callback, CDAClient client) {
ConnectableFlowable<O> connectable = flowable.observeOn(Schedulers.io()).publish();
callback.setSubscription(connectable.subscribe(
new SuccessAction<O>(callback, client),
new FailureAction(callback, client)));
connectable.connect();
return callback;
}
项目:streamingpool-core
文件:OverlapBufferStreamFactory.java
private StreamConnector(ConnectableFlowable<?> stream) {
this.stream = stream;
}
项目:reactivejournal
文件:HelloWorldTest.java
@Test
public void testHelloWorld() throws IOException, InterruptedException {
//Create the rxRecorder but don't delete the cache that has been created.
ReactiveJournal reactiveJournal = new ReactiveJournal(HelloWorldApp_JounalAsObserver.FILE_NAME);
//reactiveJournal.writeToFile("/tmp/Demo/demo.txt", true);
//Get the input from the recorder
RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
//In this case we can play the data stream in FAST mode.
PlayOptions options= new PlayOptions().filter(HelloWorldApp_JounalAsObserver.INPUT_FILTER)
.replayRate(PlayOptions.ReplayRate.FAST).sameThread(true);
//Use a ConnectableObservable as we only want to kick off the stream when all
//connections have been wired together.
ConnectableFlowable<Byte> observableInput = rxPlayer.play(options).publish();
BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
Flowable<String> flowableOutput = bytesToWords.process(observableInput);
CountDownLatch latch = new CountDownLatch(1);
//Send the output stream to the recorder to be validated against the recorded output
ReactiveValidator reactiveValidator = reactiveJournal.createReactiveValidator();
reactiveValidator.validate(HelloWorldApp_JounalAsObserver.FILE_NAME + "/.reactiveJournal",
flowableOutput, HelloWorldApp_JounalAsObserver.OUTPUT_FILTER, new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
LOG.info(o.toString());
}
@Override
public void onError(Throwable throwable) {
LOG.error("Problem in process test [{}]", throwable);
}
@Override
public void onComplete() {
LOG.info("Summary[" + reactiveValidator.getValidationResult().summaryResult()
+ "] items compared[" + reactiveValidator.getValidationResult().summaryItemsCompared()
+ "] items valid[" + reactiveValidator.getValidationResult().summaryItemsValid() +"]");
latch.countDown();
}
});
observableInput.connect();
boolean completedWithoutTimeout = latch.await(200, TimeUnit.SECONDS);
Assert.assertEquals(ValidationResult.Result.OK, reactiveValidator.getValidationResult().getResult());
Assert.assertTrue(completedWithoutTimeout);
}
项目:reactivejournal
文件:ReactiveRecorderTest.java
@Test
public void recorderTest() throws IOException{
//Flowable used to create the control run.
Flowable<Byte> observableInput = HelloWorldApp_JournalPlayThrough.observableInput;
//Create the rxRecorder and delete any previous content by clearing the cache
ReactiveJournal reactiveJournal = new ReactiveJournal(tmpDir +"/playTest");
reactiveJournal.clearCache();
//Pass the input stream into the rxRecorder which will subscribe to it and record all events.
//The subscription will happen on a new thread which will allow this program to continue.
ReactiveRecorder reactiveRecorder = reactiveJournal.createReactiveRecorder();
reactiveRecorder.recordAsync(observableInput, "input");
BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
PlayOptions options = new PlayOptions().filter("input").playFromNow(true).sameThread(true);
ConnectableFlowable recordedObservable = rxPlayer.play(options).publish();
//Pass the input Byte stream into the BytesToWordsProcessor class which subscribes to the stream and returns
//a stream of words.
Flowable<String> flowableOutput = bytesToWords.process(recordedObservable);
//Pass the output stream (of words) into the rxRecorder which will subscribe to it and record all events.
reactiveRecorder.record(flowableOutput, "output");
//Only start the recording now because we want to make sure that the BytesToWordsProcessor and the rxRecorder
//are both setup up to receive subscriptions.
recordedObservable.connect();
reactiveJournal.writeToFile(tmpDir + "/playTest/playTest.txt",true);
List<String> toBeTested = Files.readAllLines(Paths.get(tmpDir + "/playTest/playTest.txt"));
List<String> controlSet = Files.readAllLines(Paths.get("src/test/resources/playTest.txt"));
Assert.assertEquals(controlSet.size(), toBeTested.size());
//Asert all the values are in both files - they might not be in exactly the same order
String[] controlSetInput = getFilterLinesFromFiles(controlSet, "input");
String[] toBeTestedInput= getFilterLinesFromFiles(toBeTested, "input");
Assert.assertArrayEquals(controlSetInput, toBeTestedInput);
String[] controlSetOutput = getFilterLinesFromFiles(controlSet, "output");
String[] toBeTestedOutput= getFilterLinesFromFiles(toBeTested, "output");
Assert.assertArrayEquals(controlSetOutput, toBeTestedOutput);
String[] controlSetEOS = getFilterLinesFromFiles(controlSet, "endOfStream");
String[] toBeTestedEOS= getFilterLinesFromFiles(toBeTested, "endOfStream");
Assert.assertArrayEquals(controlSetEOS, toBeTestedEOS);
}
项目:reactivejournal
文件:ReactivePlayerTest.java
@Test
public void testPlay() throws IOException, InterruptedException {
//Create the rxRecorder but don't delete the cache that has been created.
ReactiveJournal reactiveJournal = new ReactiveJournal("src/test/resources/", "");
reactiveJournal.writeToFile(tmpDir +"/rctext.txt", true);
//Get the input from the recorder
RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
PlayOptions options= new PlayOptions()
.filter(HelloWorldApp_JounalAsObserver.INPUT_FILTER)
.replayRate(REPLAY_RATE_STRATEGY)
.completeAtEndOfFile(false);
ConnectableFlowable<Byte> observableInput = rxPlayer.play(options).publish();
BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
Flowable<String> flowableOutput = bytesToWords.process(observableInput);
CountDownLatch latch = new CountDownLatch(1);
//Send the output stream to the recorder to be validated against the recorded output
ReactiveValidator reactiveValidator = reactiveJournal.createReactiveValidator();
reactiveValidator.validate("src/test/resources/",
flowableOutput, HelloWorldApp_JounalAsObserver.OUTPUT_FILTER, new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
}
@Override
public void onNext(Object o) {
LOG.info(o.toString());
}
@Override
public void onError(Throwable throwable) {
LOG.error("Problem in process test [{}]", throwable);
}
@Override
public void onComplete() {
LOG.info("Summary[" + reactiveValidator.getValidationResult().summaryResult()
+ "] items compared[" + reactiveValidator.getValidationResult().summaryItemsCompared()
+ "] items valid[" + reactiveValidator.getValidationResult().summaryItemsValid() +"]");
latch.countDown();
}
});
observableInput.connect();
boolean completedWithoutTimeout = latch.await(2, TimeUnit.SECONDS);
Assert.assertEquals(ValidationResult.Result.OK, reactiveValidator.getValidationResult().getResult());
Assert.assertTrue(completedWithoutTimeout);
}
项目:RxJava2Debug
文件:FlowableOnAssemblyConnectable.java
FlowableOnAssemblyConnectable(ConnectableFlowable<T> source) {
this.source = source;
this.assembled = new RxJavaAssemblyException();
}
项目:RxjavaExample
文件:ExampleUnitTest.java
@Test
public void ConnectableObservable() throws Exception{
ConnectableFlowable<Integer> connect = Flowable.just(1,2,3).publish();
connect.subscribe(this::print);
connect.connect();//此时开始发射数据
}
项目:RxJava2Extensions
文件:ConnectableFlowableValidator.java
ConnectableFlowableValidator(ConnectableFlowable<T> source, PlainConsumer<ProtocolNonConformanceException> onViolation) {
this.source = source;
this.onViolation = onViolation;
}
项目:RxJava2Extensions
文件:FlowableOnAssemblyConnectable.java
FlowableOnAssemblyConnectable(ConnectableFlowable<T> source) {
this.source = source;
this.assembled = new RxJavaAssemblyException();
}
项目:RxJava2Extensions
文件:RxJavaProtocolValidatorTest.java
@Test
public void connectableFlowable() {
ConnectableFlowable<Integer> source = new ConnectableFlowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
s.onComplete();
s.onError(null);
s.onError(new IOException());
s.onNext(null);
s.onNext(1);
s.onSubscribe(null);
s.onSubscribe(new BooleanSubscription());
s.onSubscribe(new BooleanSubscription());
s.onComplete();
s.onNext(2);
}
@Override
public void connect(Consumer<? super Disposable> connection) {
}
};
RxJavaProtocolValidator.setOnViolationHandler(this);
Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler());
SavedHooks h = RxJavaProtocolValidator.enableAndChain();
Assert.assertTrue(RxJavaProtocolValidator.isEnabled());
try {
Flowable.just(1).publish().autoConnect().test().assertResult(1);
Flowable.empty().publish().autoConnect().test().assertResult();
Flowable.error(new IOException()).test().assertFailure(IOException.class);
ConnectableFlowable<Integer> c = RxJavaPlugins.onAssembly(source);
c.test(0);
c.connect();
Assert.assertEquals(15, errors.size());
TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class);
TestHelper.assertError(errors, 1, NullOnErrorParameterException.class);
TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class);
TestHelper.assertError(errors, 3, MultipleTerminationsException.class);
TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class);
Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException);
TestHelper.assertError(errors, 5, MultipleTerminationsException.class);
TestHelper.assertError(errors, 6, NullOnNextParameterException.class);
TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class);
TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class);
TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class);
TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class);
TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class);
TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class);
TestHelper.assertError(errors, 13, MultipleTerminationsException.class);
TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class);
} finally {
h.restore();
RxJavaProtocolValidator.setOnViolationHandler(null);
}
}
项目:Attendance
文件:CompanyChooseBaseSearchActivity.java
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.companychoose_activity);
mActionBarToolbar = (Toolbar) findViewById(R.id.tool_bar);
setSupportActionBar(mActionBarToolbar);
getSupportActionBar().setTitle(getString(R.string.choosecompany));
mSubscription = new CompositeSubscription();
FloatingActionButton fab = (FloatingActionButton) findViewById(R.id.fab);
fab.setOnClickListener(v -> {
mSubscription.add(getNewCompanyDialog(getString(R.string.newcompany), getString(R.string.fullfirma))
.subscribeOn(rx.android.schedulers.AndroidSchedulers.mainThread())
.observeOn(Schedulers.computation())
.subscribe(this::setBoolean)
);
}
);
mQueryEditText = (EditText) findViewById(R.id.query_edit_text);
mSearchButton = (Button) findViewById(R.id.search_button);
mProgressBar = (ProgressBar) findViewById(R.id.progress_bar);
_rxBus = getRxBusSingleton();
RecyclerView list = (RecyclerView) findViewById(R.id.list);
list.setLayoutManager(new LinearLayoutManager(this));
list.setAdapter(mAdapter = new CompanyChooseAdapter(_rxBus));
_disposables = new CompositeDisposable();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof CompanyChooseBaseSearchActivity.OnItemClickEvent) {
}
if (event instanceof Company) {
Company model = (Company) event;
saveIcoId(model.cmico+ " " + model.cmname, model);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:DgAllEmpsAbsListFragment.java
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
//create mvvm without dagger2
//mViewModel = getAllEmpsAbsMvvmViewModel();
_disposables = new CompositeDisposable();
_rxBus = ((AttendanceApplication) getActivity().getApplication()).getRxBusSingleton();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof DgAllEmpsAbsListFragment.ClickFobEvent) {
Log.d("DgAllEmpsAbsActivity ", " fobClick ");
String serverx = "DgAllEmpsAbsListFragment fobclick";
Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();
}
if (event instanceof RealmEmployee) {
String idemp = ((RealmEmployee) event).getKeyf();
RealmEmployee model= (RealmEmployee) event;
//Log.d("AllEmpsAbsListFragment ", icos);
//String serverx = "AllEmpsAbsListFragment shortclick";
//Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();
Intent i = new Intent(getActivity(), AbsenceActivity.class);
Bundle extras = new Bundle();
extras.putString("fromact", "1");
extras.putString("idemp", idemp);
i.putExtras(extras);
startActivity(i);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:AbsenceListRxFragment.java
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
String githubToken = Constants.ETEST_API_KEY;
String urlx = SettingsActivity.getServerName(getActivity());
fromact = getArguments().getString("fromact");
Log.d("idemp inFrg fromact", fromact);
idemp = getArguments().getString("idemp");
Log.d("idemp inFrg idemp", idemp);
gettimestramp = FirebaseDatabase.getInstance().getReference("gettimestamp");
getTimeListener = new ValueEventListener() {
public void onDataChange(DataSnapshot dataSnapshot) {
timestampx=dataSnapshot.getValue().toString();
Log.d(TAG, "ServerValue.TIMESTAMP oncreate " + timestampx);
}
public void onCancelled(DatabaseError databaseError) { }
};
gettimestramp.addValueEventListener(getTimeListener);
gettimestramp.setValue(ServerValue.TIMESTAMP);
_disposables = new CompositeDisposable();
_rxBus = getRxBusSingleton();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof AbsenceListRxFragment.TapEvent) {
///_showTapText();
}
if (event instanceof Attendance) {
String keys = ((Attendance) event).getRok();
//Log.d("In FRGM longClick", keys);
Attendance model= (Attendance) event;
final String datsx = model.getDatsString();
//Log.d(TAG, "datsx " + datsx);
gettimestramp.setValue(ServerValue.TIMESTAMP);
//Log.d(TAG, "ServerValue.TIMESTAMP " + timestampx);
long timestampl = Long.parseLong(timestampx);
long datsl = Long.parseLong(datsx);
long rozdiel = timestampl - datsl;
//Log.d(TAG, "rozdiel " + rozdiel);
//Toast.makeText(getActivity(), "Longclick " + keys,Toast.LENGTH_SHORT).show();
if( model.aprv.equals("2")) {
rozdiel=1;
}
if( rozdiel < 180000 ) {
getAbsenceDialog( keys, (Attendance) event);
}else{
Toast.makeText(getActivity(), getResources().getString(R.string.cantdel),Toast.LENGTH_SHORT).show();
}
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:AttendanceListRxFragment.java
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
String githubToken = Constants.ETEST_API_KEY;
String urlx = SettingsActivity.getServerName(getActivity());
gettimestramp = FirebaseDatabase.getInstance().getReference("gettimestamp");
getTimeListener = new ValueEventListener() {
public void onDataChange(DataSnapshot dataSnapshot) {
timestampx=dataSnapshot.getValue().toString();
Log.d("Att TIMES oncreate ", timestampx);
}
public void onCancelled(DatabaseError databaseError) { }
};
gettimestramp.addValueEventListener(getTimeListener);
gettimestramp.setValue(ServerValue.TIMESTAMP);
_disposables = new CompositeDisposable();
_rxBus = getRxBusSingleton();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof AttendanceListRxFragment.TapEvent) {
///_showTapText();
}
if (event instanceof Attendance) {
String keys = ((Attendance) event).getRok();
//Log.d("In FRGM longClick", keys);
getAttendanceDialog( keys, (Attendance) event);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:AllEmpsAbsListFragment.java
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
//create mvvm without dagger2
//mViewModel = getAllEmpsAbsMvvmViewModel();
_disposables = new CompositeDisposable();
_rxBus = ((AttendanceApplication) getActivity().getApplication()).getRxBusSingleton();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof AllEmpsAbsListFragment.ClickFobEvent) {
Log.d("AllEmpsAbsActivity ", " fobClick ");
String serverx = "AllEmpsAbsListFragment fobclick";
Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();
}
if (event instanceof RealmEmployee) {
String idemp = ((RealmEmployee) event).getKeyf();
RealmEmployee model= (RealmEmployee) event;
//Log.d("AllEmpsAbsListFragment ", icos);
//String serverx = "AllEmpsAbsListFragment shortclick";
//Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();
Intent i = new Intent(getActivity(), AbsenceActivity.class);
Bundle extras = new Bundle();
extras.putString("fromact", "1");
extras.putString("idemp", idemp);
i.putExtras(extras);
startActivity(i);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:AbsServerAsBaseSearchActivity.java
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_absserver);
//ActivityAbsserverDbindBinding binding = DataBindingUtil.setContentView(this, R.layout.activity_absserver_dbind);
//binding.setServerabs(new Attendance());
mActionBarToolbar = (Toolbar) findViewById(R.id.tool_bar);
setSupportActionBar(mActionBarToolbar);
getSupportActionBar().setTitle(getString(R.string.action_absmysql));
mQueryEditText = (EditText) findViewById(R.id.query_edit_text);
mSearchButton = (Button) findViewById(R.id.search_button);
mProgressBar = (ProgressBar) findViewById(R.id.progress_bar);
String githubToken = Constants.ETEST_API_KEY;
String urlx = "http:\\" + SettingsActivity.getServerName(this);
String usicox = SettingsActivity.getUsIco(this);
if( usicox.equals("44551142")) {
urlx = "http:\\" + Constants.EDCOM_url;
}
_githubService = RfEtestService.createGithubService(githubToken, urlx);
//cheeses = Arrays.asList(getResources().getStringArray(R.array.cheeses3));
_rxBus = getRxBusSingleton();
RecyclerView list = (RecyclerView) findViewById(R.id.list);
list.setLayoutManager(new LinearLayoutManager(this));
list.setAdapter(mAdapter = new AbsServerAsAdapter(_rxBus));
_disposables = new CompositeDisposable();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof AbsServerAsBaseSearchActivity.TapEvent) {
///_showTapText();
}
if (event instanceof Attendance) {
///tvContent = (TextView) findViewById(R.id.tvContent);
///tvContent.setText(((EventRxBus.Message) event).message); OK change event instanceof to EventRxBus.Message
///_showTapTextToast(((EventRxBus.Absence) event).daod + " / " + ((EventRxBus.Absence) event).dado); OK change event instanceof to EventRxBus.Absence
saveAbsServer(((Attendance) event).daod + " / " + ((Attendance) event).dado, ((Attendance) event));
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:EmployeeMvvmActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_mvvm_employees);
coordinatorLayout = (CoordinatorLayout) findViewById(R.id
.coordinatorLayout);
mActionBarToolbar = (Toolbar) findViewById(R.id.tool_bar);
setSupportActionBar(mActionBarToolbar);
getSupportActionBar().setTitle(getString(R.string.action_myemployee));
mViewModel = getEmployeeMvvmViewModel();
_rxBus = ((AttendanceApplication) getApplication()).getRxBusSingleton();
_disposables = new CompositeDisposable();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
//Log.d("rxBus ", "tapEventEmitter");
if (event instanceof EmployeeMvvmActivity.FobTapEvent) {
Log.d("EmpoloyeeActivity ", " fobClick ");
//attention - activity leaked
//mSubscription.add(getNewEmployeeDialog(getString(R.string.newcompany), getString(R.string.fullfirma))
// .subscribeOn(rx.android.schedulers.AndroidSchedulers.mainThread())
// .observeOn(Schedulers.computation())
// .subscribe(this::setBoolean)
//);;
}
if (event instanceof Employee) {
String keys = ((Employee) event).getUsatw();
//Log.d("In FRGM longClick", keys);
Employee model= (Employee) event;
//Toast.makeText(this, "Longclick " + keys,Toast.LENGTH_SHORT).show();
getEditEmloyeeDialog(model);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
setupViews();
FloatingActionButton fab = (FloatingActionButton) findViewById(R.id.fab);
fab.setOnClickListener(new View.OnClickListener() {
public void onClick(View v) {
Toast.makeText(EmployeeMvvmActivity.this, R.string.createemployee, Toast.LENGTH_LONG).show();
}
});
}
项目:Attendance
文件:CompaniesListFragment.java
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
mViewModel = getCompaniesMvvmViewModel();
_disposables = new CompositeDisposable();
_rxBus = ((AttendanceApplication) getActivity().getApplication()).getRxBusSingleton();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof CompaniesListFragment.ClickFobEvent) {
Log.d("CompaniesActivity ", " fobClick ");
mSubscription.add(getNewCompanyDialog(getString(R.string.newcompany), getString(R.string.fullfirma))
.subscribeOn(rx.android.schedulers.AndroidSchedulers.mainThread())
.observeOn(Schedulers.computation())
.subscribe(this::setBoolean)
);
}
if (event instanceof Company) {
String icos = ((Company) event).getCmico();
Company model= (Company) event;
Log.d("CompaniesListFragment ", icos);
getEditCompanyDialog(model);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:Attendance
文件:DgAeaListFragment.java
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
//create mvvm without dagger2
//mViewModel = getAllEmpsAbsMvvmViewModel();
_disposables = new CompositeDisposable();
_rxBus = ((AttendanceApplication) getActivity().getApplication()).getRxBusSingleton();
ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
_disposables
.add(tapEventEmitter.subscribe(event -> {
if (event instanceof DgAllEmpsAbsListFragment.ClickFobEvent) {
Log.d("DgAllEmpsAbsActivity ", " fobClick ");
String serverx = "DgAllEmpsAbsListFragment fobclick";
Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();
}
if (event instanceof RealmEmployee) {
String idemp = ((RealmEmployee) event).getKeyf();
RealmEmployee model= (RealmEmployee) event;
//Log.d("AllEmpsAbsListFragment ", icos);
//String serverx = "AllEmpsAbsListFragment shortclick";
//Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();
Intent i = new Intent(getActivity(), AbsenceActivity.class);
Bundle extras = new Bundle();
extras.putString("fromact", "1");
extras.putString("idemp", idemp);
i.putExtras(extras);
startActivity(i);
}
}));
_disposables
.add(tapEventEmitter.publish(stream ->
stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
.observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
///_showTapCount(taps.size()); OK
}));
_disposables.add(tapEventEmitter.connect());
}
项目:StompProtocolAndroid
文件:StompClient.java
/**
* If already connected and reconnect=false - nope
*
* @param _headers might be null
*/
public void connect(List<StompHeader> _headers, boolean reconnect) {
if (reconnect) disconnect();
if (mConnected) return;
mLifecycleDisposable = mConnectionProvider.getLifecycleReceiver()
.subscribe(lifecycleEvent -> {
switch (lifecycleEvent.getType()) {
case OPENED:
List<StompHeader> headers = new ArrayList<>();
headers.add(new StompHeader(StompHeader.VERSION, SUPPORTED_VERSIONS));
if (_headers != null) headers.addAll(_headers);
mConnectionProvider.send(new StompMessage(StompCommand.CONNECT, headers, null).compile())
.subscribe();
break;
case CLOSED:
mConnected = false;
isConnecting = false;
break;
case ERROR:
mConnected = false;
isConnecting = false;
break;
}
});
isConnecting = true;
mMessagesDisposable = mConnectionProvider.messages()
.map(StompMessage::from)
.subscribe(stompMessage -> {
if (stompMessage.getStompCommand().equals(StompCommand.CONNECTED)) {
mConnected = true;
isConnecting = false;
for (ConnectableFlowable<Void> flowable : mWaitConnectionFlowables) {
flowable.connect();
}
mWaitConnectionFlowables.clear();
}
callSubscribers(stompMessage);
});
}
项目:cyclops
文件:FlowableKind.java
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public ConnectableFlowable<T> publish() {
return boxed.publish();
}
项目:cyclops
文件:FlowableKind.java
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public ConnectableFlowable<T> publish(int bufferSize) {
return boxed.publish(bufferSize);
}
项目:cyclops
文件:FlowableKind.java
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public ConnectableFlowable<T> replay() {
return boxed.replay();
}