Java 类io.reactivex.subjects.Subject 实例源码
项目:Learning-RxJava
文件:Ch5_25.java
public static void main(String[] args) {
Subject<String> subject =
AsyncSubject.create();
subject.subscribe(s ->
System.out.println("Observer 1: " + s),
Throwable::printStackTrace,
() -> System.out.println("Observer 1 done!")
);
subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();
subject.subscribe(s ->
System.out.println("Observer 2: " + s),
Throwable::printStackTrace,
() -> System.out.println("Observer 2 done!")
);
}
项目:Learning-RxJava
文件:Ch5_27.java
public static void main(String[] args) {
Subject<String> subject =
UnicastSubject.create();
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(l -> ((l + 1) * 300) + " milliseconds")
.subscribe(subject);
sleep(2000);
//multicast to support multiple Observers
Observable<String> multicast =
subject.publish().autoConnect();
//bring in first Observer
multicast.subscribe(s -> System.out.println("Observer 1: "
+ s));
sleep(2000);
//bring in second Observer
multicast.subscribe(s -> System.out.println("Observer 2: "
+ s));
sleep(1000);
}
项目:MoligyMvpArms
文件:RxBus.java
/**
* 取消监听
*
* @param tag
* @param observable
* @return
*/
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
@NonNull Observable<?> observable) {
if (null == observable)
return getInstance();
List<Subject> subjects = subjectMapper.get(tag);
if (null != subjects) {
subjects.remove( observable);
if (isEmpty(subjects)) {
subjectMapper.remove(tag);
LogUtils.debugInfo("unregister"+ tag + " size:" + subjects.size());
}
}
return getInstance();
}
项目:RxNetWork
文件:RxBus.java
/**
* 取消订阅
*
* @param tag 标志
* @return true 取消成功
*/
public boolean unregister(@NonNull Object tag) {
RxBusEvent rxBusEvent = rxBusEventArrayMap.get(tag);
if (RxUtils.isEmpty(rxBusEvent)) {
return true;
}
Subject<Object> subject = rxBusEvent.subject;
Disposable disposable = rxBusEvent.disposable;
if (!disposable.isDisposed()) {
disposable.dispose();
}
if (!subject.hasObservers()) {
rxBusEventArrayMap.remove(tag);
return true;
}
return false;
}
项目:pyplyn
文件:ConfigurationUpdateManager.java
public ClusterMigrationListener() {
// init a publish subject, to allow emitting migration events
Subject<MigrationEvent> subj = PublishSubject.create();
migrationEvent = subj.toSerialized();
// collect migration events every 10 seconds and remove any redundant tasks on every node
migrationEvent.buffer(10, TimeUnit.SECONDS)
// filter our windows when no events have been observed
.filter(events -> !events.isEmpty())
// log partition migration event
.doOnNext(events -> logger.info("[CLUSTER] Migrated {} partition", events.size()))
// and update tasks
.doOnNext(events -> updateTasksAfterClusterEvent())
// process async
.subscribeOn(Schedulers.computation())
.subscribe();
}
项目:jobson
文件:JobsDAOTest.java
@Test
public void testPersistStdoutReturnsADisposableThatStopsFurtherReads() {
final JobDAO dao = getInstance();
final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
final Subject<byte[]> stdoutSubject = PublishSubject.create();
final AtomicBoolean stdoutObsWasRead = new AtomicBoolean(false);
final Observable<byte[]> stdoutObs = stdoutSubject.map(data -> {
stdoutObsWasRead.set(true);
return data;
});
final Disposable disposable = dao.appendStdout(jobId, stdoutObs);
disposable.dispose();
stdoutSubject.onNext(TestHelpers.generateRandomBytes());
assertThat(stdoutObsWasRead.get());
}
项目:jobson
文件:JobsDAOTest.java
@Test
public void testPersistStderrReturnsADisposableThatStopsFurtherReads() {
final JobDAO dao = getInstance();
final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
final Subject<byte[]> stderrSubject = PublishSubject.create();
final AtomicBoolean stderrObsWasRead = new AtomicBoolean(false);
final Observable<byte[]> stderrObs = stderrSubject.map(data -> {
stderrObsWasRead.set(true);
return data;
});
final Disposable disposable = dao.appendStderr(jobId, stderrObs);
disposable.dispose();
stderrSubject.onNext(TestHelpers.generateRandomBytes());
assertThat(stderrObsWasRead.get());
}
项目:jobson
文件:JobExecutorTest.java
@Test
public void testExecuteStdoutListenerIsCalledWithCompletedOnceApplicationExecutionEnds() throws Throwable {
final JobExecutor jobExecutor = getInstance();
final AtomicBoolean completedCalled = new AtomicBoolean(false);
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.doOnComplete(() -> completedCalled.set(true)).subscribe();
final JobEventListeners listeners = createStdoutListener(stdoutSubject);
final CancelablePromise<JobExecutionResult> ret =
jobExecutor.execute(STANDARD_REQUEST, listeners);
promiseAssert(ret, result -> {
try {
// The stdout thread can race with the exit thread
Thread.sleep(50);
assertThat(completedCalled.get()).isTrue();
} catch (InterruptedException ignored) {}
});
}
项目:jobson
文件:JobExecutorTest.java
@Test
public void testExecuteStderrListenerIsCompletedOnceApplicationExecutionEnds() throws Throwable {
final JobExecutor jobExecutor = getInstance();
final AtomicBoolean completedCalled = new AtomicBoolean(false);
final Subject<byte[]> stderrSubject = PublishSubject.create();
stderrSubject.doOnComplete(() -> completedCalled.set(true)).subscribe();
final JobEventListeners listeners = createStderrListener(stderrSubject);
final CancelablePromise<JobExecutionResult> ret =
jobExecutor.execute(STANDARD_REQUEST, listeners);
promiseAssert(ret, result -> {
try {
// The stderr thread can race with the exit thread
Thread.sleep(50);
assertThat(completedCalled.get()).isTrue();
} catch (InterruptedException ignored) {}
});
}
项目:jobson
文件:JobExecutorTest.java
@Test
public void testExecuteEvaluatesJobInputsAsExpected() throws InterruptedException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${inputs.foo}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
assertThat(stringFromStdout).isEqualTo("a"); // from spec
}
项目:jobson
文件:JobManagerTest.java
@Test
public void testGetStdoutUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException {
final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>();
final Subject<byte[]> stdoutSubject = PublishSubject.create();
final JobExecutor executor =
MockJobExecutor.thatUses(executorPromise, stdoutSubject, Observable.just(TestHelpers.generateRandomBytes()));
final JobManager jobManager = createManagerWith(executor);
final Pair<JobId, CancelablePromise<FinalizedJob>> ret =
jobManager.submit(STANDARD_VALID_REQUEST);
final Observable<byte[]> stdoutObservable =
jobManager.stdoutUpdates(ret.getLeft()).get();
final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>();
stdoutObservable.subscribe(bytesFromObservable::set);
final byte[] bytesExpected = TestHelpers.generateRandomBytes();
stdoutSubject.onNext(bytesExpected);
executorPromise.complete(new JobExecutionResult(FINISHED));
ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS);
assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected);
}
项目:jobson
文件:JobManagerTest.java
@Test
public void testGetStderrUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException {
final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>();
final Subject<byte[]> stderrSubject = PublishSubject.create();
final JobExecutor executor =
MockJobExecutor.thatUses(executorPromise, Observable.just(TestHelpers.generateRandomBytes()), stderrSubject);
final JobManager jobManager = createManagerWith(executor);
final Pair<JobId, CancelablePromise<FinalizedJob>> ret =
jobManager.submit(STANDARD_VALID_REQUEST);
final Observable<byte[]> stderrObservable =
jobManager.stderrUpdates(ret.getLeft()).get();
final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>();
stderrObservable.subscribe(bytesFromObservable::set);
final byte[] bytesExpected = TestHelpers.generateRandomBytes();
stderrSubject.onNext(bytesExpected);
executorPromise.complete(new JobExecutionResult(FINISHED));
ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS);
assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected);
}
项目:Reactive-Android-Programming
文件:Sandbox.java
private static void demo2() {
Subject<Long> subject = PublishSubject.create();
Observable.interval(2, TimeUnit.SECONDS)
.take(3)
.doOnComplete(() -> log("Origin-One-doOnComplete"))
.subscribe(subject);
Observable.interval(1, TimeUnit.SECONDS)
.take(2)
.doOnComplete(() -> log("Origin-Two-doOnComplete"))
.subscribe(subject);
subject
.doOnComplete(() -> log("First-doOnComplete"))
.subscribe(v -> log(v));
}
项目:Reactive-Android-Programming
文件:Sandbox.java
private static void demo1() throws InterruptedException {
Subject<Long> subject = PublishSubject.create();
Observable.interval(2, TimeUnit.SECONDS)
.take(5)
.doOnSubscribe((d) -> log("Original-doOnSubscribe"))
.doOnComplete(() -> log("Original-doOnComplete"))
.subscribe(subject);
subject
.doOnSubscribe((d) -> log("First-doOnSubscribe"))
.doOnComplete(() -> log("First-doOnComplete"))
.subscribe(v -> log("First: " + v));
Thread.sleep(4100);
subject
.doOnSubscribe((d) -> log("Second-doOnSubscribe"))
.doOnComplete(() -> log("Second-doOnComplete"))
.subscribe(v -> log("Second: " + v));
}
项目:Ghost-Android
文件:AuthService.java
private Observable<JsonElement> revokeToken(AuthToken token, String clientSecret) {
// this complexity exists because the access token must be revoked AFTER the refresh token
// why? because the access token is needed for both revocations!
Subject<JsonElement> responses = PublishSubject.create();
RevokeReqBody refreshReqBody = RevokeReqBody.fromRefreshToken(
token.getRefreshToken(), clientSecret);
revokeSingleToken(token.getAuthHeader(), refreshReqBody, responses)
.doOnComplete(() -> {
RevokeReqBody accessReqBody = RevokeReqBody.fromAccessToken(
token.getAccessToken(), clientSecret);
revokeSingleToken(token.getAuthHeader(), accessReqBody, responses)
.subscribe();
})
.subscribe();
return responses;
}
项目:rx-property-android
文件:MainActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
viewModel = new MainViewModel(this);
final ActivityMainBinding binding
= DataBindingUtil.setContentView(this, R.layout.activity_main);
// You can bind trigger observable instead of using "rxCommandOnClick" on layout xml.
final Button goToTodoButton = binding.buttonGoToTodo;
final Subject<NoParameter> emitter = PublishSubject.create();
goToTodoButton.setOnClickListener(view -> emitter.onNext(NoParameter.INSTANCE));
viewModel.goToTodoCommand.bindTrigger(emitter);
viewModel.goToTodoCommand.setCancellable(() -> goToTodoButton.setOnClickListener(null));
binding.setViewModel(viewModel);
}
项目:rx-property-android
文件:RxCommandTest.java
@Test
public void emitValueWhenBoundTriggerEmitsValue() {
// given
Subject<NoParameter> trigger = PublishSubject.create();
RxCommand<NoParameter> command = new RxCommand<NoParameter>()
.bindTrigger(trigger);
TestObserver<NoParameter> testObserver = command.test();
// when
trigger.onNext(NoParameter.INSTANCE);
// then
testObserver.assertSubscribed()
.assertValue(NoParameter.INSTANCE)
.assertNoErrors()
.assertNotComplete()
.dispose();
// after
command.dispose();
}
项目:rx-property-android
文件:RxCommandTest.java
@Test
public void throwsErrorWhenBoundTriggerEmitsError() {
// given
Subject<NoParameter> trigger = PublishSubject.create();
RxCommand<NoParameter> command = new RxCommand<NoParameter>()
.bindTrigger(trigger);
TestObserver<NoParameter> testObserver = command.test();
// when
trigger.onError(new RuntimeException("Error in the trigger observable"));
// then
testObserver.assertFailureAndMessage(
RuntimeException.class, "Error in the trigger observable")
.dispose();
// after
command.dispose();
}
项目:rx-property-android
文件:RxCommandTest.java
@Test
public void emitsOnCompleteWhenBoundTriggerIsCompleted() {
// given
Subject<NoParameter> trigger = PublishSubject.create();
RxCommand<NoParameter> command = new RxCommand<NoParameter>()
.bindTrigger(trigger);
TestObserver<NoParameter> testObserver = command.test();
// when
trigger.onNext(NoParameter.INSTANCE);
trigger.onComplete();
// then
testObserver.assertResult(NoParameter.INSTANCE)
.dispose();
// after
command.dispose();
}
项目:rx-property-android
文件:RxCommandTest.java
@Test
public void triggerBindingCanExecuteMoreThanOnce() {
// given
Subject<NoParameter> firstTrigger = PublishSubject.create();
Subject<NoParameter> secondTrigger = PublishSubject.create();
RxCommand<NoParameter> command = new RxCommand<>();
TestObserver<NoParameter> testObserver = command.test();
// when
command.bindTrigger(firstTrigger);
firstTrigger.onNext(NoParameter.INSTANCE);
command.bindTrigger(secondTrigger);
firstTrigger.onNext(NoParameter.INSTANCE);
secondTrigger.onNext(NoParameter.INSTANCE);
// then
testObserver.assertSubscribed()
.assertValues(NoParameter.INSTANCE, NoParameter.INSTANCE)
.assertNoErrors()
.assertNotComplete()
.dispose();
// after
command.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void followsSourceObservable() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = property.test();
// when
source.onNext("First");
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValues("First", "Second")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void doesNotEmitSameValuesWhenSourceObservableEmitsSameValuesWithDistinctUntilChangeMode() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = property.test();
// when
source.onNext("RxProperty");
source.onNext("RxProperty");
source.onNext("RxProperty");
// then
testObserver.assertSubscribed()
.assertValue("RxProperty")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void emitsAllValuesWhenSourceObservableEmitsSameValuesWithoutDistinctUntilChangeMode() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source, EnumSet.of(RxProperty.Mode.NONE));
TestObserver<String> testObserver = property.test();
// when
source.onNext("RxProperty");
source.onNext("RxProperty");
source.onNext("RxProperty");
// then
testObserver.assertSubscribed()
.assertValues("RxProperty", "RxProperty", "RxProperty")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void followsSourceObservable() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = propertyObserver(property);
// when
source.onNext("First");
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValues("First", "Second")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void distinctUntilChangeWhenSourceObservableEmitsSameValuesWithDistinctUntilChangeMode() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = propertyObserver(property);
// when
source.onNext("RxProperty");
source.onNext("RxProperty");
source.onNext("RxProperty");
// then
testObserver.assertSubscribed()
.assertValue("RxProperty")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void notifyAllValuesWhenSourceObservableEmitsSameValuesWithoutDistinctUntilChangeMode() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source, EnumSet.of(RxProperty.Mode.NONE));
TestObserver<String> testObserver = propertyObserver(property);
// when
source.onNext("RxProperty");
source.onNext("RxProperty");
source.onNext("RxProperty");
// then
testObserver.assertSubscribed()
.assertValues("RxProperty", "RxProperty", "RxProperty")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void noLongerNotifyWhenSourceObservableEmitsError() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = propertyObserver(property);
// when
source.onNext("First");
source.onError(new RuntimeException("Error in source observable"));
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValue("First")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void noLongerNotifyWhenSourceObservableIsCompleted() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = propertyObserver(property);
// when
source.onNext("First");
source.onComplete();
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValue("First")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void noLongerNotifyWhenSourceObservableEmitsValueAfterDisposed() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = propertyObserver(property);
// when
source.onNext("First");
property.dispose();
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValue("First")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void followsSourceObservable() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = valueFieldObserver(property);
// when
source.onNext("First");
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValues("First", "Second")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void distinctUntilChangeWhenSourceObservableEmitsSameValuesWithDistinctUntilChangeMode() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = valueFieldObserver(property);
// when
source.onNext("RxProperty");
source.onNext("RxProperty");
source.onNext("RxProperty");
// then
testObserver.assertSubscribed()
.assertValue("RxProperty")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void notifyAllValuesWhenSourceObservableEmitsSameValuesWithoutDistinctUntilChangeMode() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source, EnumSet.of(RxProperty.Mode.NONE));
TestObserver<String> testObserver = valueFieldObserver(property);
// when
source.onNext("RxProperty");
source.onNext("RxProperty");
source.onNext("RxProperty");
// then
testObserver.assertSubscribed()
.assertValues("RxProperty", "RxProperty", "RxProperty")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void noLongerNotifyWhenSourceObservableEmitsError() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = valueFieldObserver(property);
// when
source.onNext("First");
source.onError(new RuntimeException("Error in source observable"));
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValue("First")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void noLongerNotifyWhenSourceObservableIsCompleted() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = valueFieldObserver(property);
// when
source.onNext("First");
source.onComplete();
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValue("First")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void noLongerNotifyWhenSourceObservableEmitsValueAfterDisposed() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
TestObserver<String> testObserver = valueFieldObserver(property);
// when
source.onNext("First");
property.dispose();
source.onNext("Second");
// then
testObserver.assertSubscribed()
.assertValue("First")
.assertNoErrors()
.assertNotComplete()
.dispose();
}
项目:rx-property-android
文件:RxPropertyTest.java
@Test
public void errorObservablesEmitOnCompleteWhenSourceObservableIsCompleted() {
// given
Subject<String> source = PublishSubject.create();
property = new RxProperty<>(source);
property.setValidator(new AllSuccessValidator());
testObserver = new RxPropertyErrorObserver<>(property);
// when
source.onComplete();
// then
testObserver.assertNoErrors()
.assertNoSummarizedErrors()
.assertNoHasErrors()
.assertComplete()
.dispose();
}
项目:quill
文件:AuthService.java
private Observable<JsonElement> revokeToken(AuthToken token, String clientSecret) {
// this complexity exists because the access token must be revoked AFTER the refresh token
// why? because the access token is needed for both revocations!
Subject<JsonElement> responses = PublishSubject.create();
RevokeReqBody refreshReqBody = RevokeReqBody.fromRefreshToken(
token.getRefreshToken(), clientSecret);
revokeSingleToken(token.getAuthHeader(), refreshReqBody, responses)
.doOnComplete(() -> {
RevokeReqBody accessReqBody = RevokeReqBody.fromAccessToken(
token.getAccessToken(), clientSecret);
revokeSingleToken(token.getAuthHeader(), accessReqBody, responses)
.subscribe();
})
.subscribe();
return responses;
}
项目:lsql
文件:AbstractQuery.java
/**
* Turns this query into an Observable. Each subscription will trigger the underlying database operation.
* <p/>
* This is a low-level API to directly work with the JDBC ResultSet.
*
* @return the Observable
*/
public Observable<ResultSetWithColumns> rxResultSet() {
return Subject.create(emitter -> {
try {
ResultSetWithColumns resultSetWithColumns = createResultSetWithColumns();
checkConformity(resultSetWithColumns.getConverters());
while (resultSetWithColumns.getResultSet().next() && !emitter.isDisposed()) {
emitter.onNext(resultSetWithColumns);
}
resultSetWithColumns.getResultSet().close();
emitter.onComplete();
} catch (SQLException e) {
emitter.onError(e);
}
});
}
项目:Learning-RxJava
文件:Ch5_22.java
public static void main(String[] args) {
Subject<String> subject = PublishSubject.create();
subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();
subject.map(String::length)
.subscribe(System.out::println);
}
项目:Learning-RxJava
文件:Ch5_26.java
public static void main(String[] args) {
Subject<String> subject =
UnicastSubject.create();
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(l -> ((l + 1) * 300) + " milliseconds")
.subscribe(subject);
sleep(2000);
subject.subscribe(s -> System.out.println("Observer 1: " +
s));
sleep(2000);
}