Java 类io.reactivex.Notification 实例源码
项目:Java-EX
文件:RxIterator.java
private void calcNext() {
if (completed) {
return;
}
if (next == null) {
Notification<T> take = uncheck(queue::take);
if (take.isOnNext()) {
next = take;
} else if (take.isOnError()) {
completed = true;
throw new RuntimeException(take.getError());
} else {
completed = true;
}
}
}
项目:Java-EX
文件:RxIterator.java
private void calcNext() {
if (completed) {
return;
}
if (next == null) {
subscription.request(1);
Notification<T> take = uncheck(() -> queue.take());
if (take.isOnNext()) {
next = take;
} else if (take.isOnError()) {
completed = true;
throw new RuntimeException(take.getError());
} else {
completed = true;
}
}
}
项目:rxjava2-jdbc
文件:TransactedCallableBuilder.java
private static <T> Flowable<Tx<T>> inTransaction(CallableBuilder b,
Function<Single<Connection>, Flowable<Notification<T>>> f) {
return Flowable.defer(() -> {
AtomicReference<Connection> con = new AtomicReference<Connection>();
// set the atomic reference when transactedConnection emits
Single<Connection> transactedConnection = b.connection //
.map(c -> Util.toTransactedConnection(con, c));
return f.apply(transactedConnection) //
.<Tx<T>>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<T>) tx).connection().commit();
}
});
});
}
项目:rxjava2-jdbc
文件:Call.java
static Flowable<Notification<TupleN<Object>>> createWithNParameters( //
Single<Connection> connection, //
String sql, //
Flowable<List<Object>> parameterGroups, //
List<ParameterPlaceholder> parameterPlaceholders, //
List<Class<?>> outClasses) {
return connection //
.toFlowable() //
.flatMap( //
con -> createWithParameters( //
con, //
sql, //
parameterGroups, //
parameterPlaceholders, //
(stmt, parameters) -> createWithNParameters(stmt, parameters, parameterPlaceholders,
outClasses)));
}
项目:rxjava2-jdbc
文件:Call.java
private static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Connection con,
String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
log.debug("Update.create {}", sql);
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet1<T1>>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> {
List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
parameters);
Flowable<T1> flowable1 = createFlowable(stmt, f1);
return Single.just(new CallableResultSet1<T1>(outputValues, flowable1)).toFlowable();
}) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
项目:rxjava2-jdbc
文件:Call.java
private static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(Connection con,
String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
Function<? super ResultSet, ? extends T1> f1, Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet2<T1, T2>>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> {
List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
parameters);
final Flowable<T1> flowable1 = createFlowable(stmt, f1);
stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
final Flowable<T2> flowable2 = createFlowable(stmt, f2);
return Single.just(new CallableResultSet2<T1, T2>(outputValues, flowable1, flowable2))
.toFlowable();
}) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
项目:rxjava2-jdbc
文件:Call.java
private static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
Connection con, String sql, Flowable<List<Object>> parameterGroups,
List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet3<T1, T2, T3>>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> {
List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
parameters);
final Flowable<T1> flowable1 = createFlowable(stmt, f1);
stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
final Flowable<T2> flowable2 = createFlowable(stmt, f2);
stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
final Flowable<T3> flowable3 = createFlowable(stmt, f3);
return Single.just(
new CallableResultSet3<T1, T2, T3>(outputValues, flowable1, flowable2, flowable3))
.toFlowable();
}) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
项目:rxjava2-jdbc
文件:Call.java
private static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Connection con, String sql,
Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
List<Function<? super ResultSet, ?>> functions, int fetchSize) {
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<CallableResultSetN>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> {
List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
parameters);
List<Flowable<?>> flowables = Lists.newArrayList();
int i = 0;
do {
Function<? super ResultSet, ?> f = functions.get(i);
flowables.add(createFlowable(stmt, f));
i++;
} while (stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT));
return Single.just(new CallableResultSetN(outputValues, flowables)).toFlowable();
}) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
项目:RxJava2Extensions
文件:ActivePlanN.java
@Override
protected void match() throws Exception {
Object[] notifications = new Object[this.observers.size()];
int j = 0;
int completedCount = 0;
for (JoinObserver1<? extends Object> jo : this.observers) {
if (jo.queue().isEmpty()) {
return;
}
Notification<? extends Object> n = jo.queue().peek();
if (n.isOnComplete()) {
completedCount++;
}
notifications[j] = n.getValue();
j++;
}
if (completedCount == j) {
onCompleted.run();
} else {
dequeue();
onNext.accept(notifications);
}
}
项目:RxJava2Extensions
文件:ActivePlan4.java
@Override
protected void match() throws Exception {
if (!jo1.queue().isEmpty()
&& !jo2.queue().isEmpty()
&& !jo3.queue().isEmpty()
&& !jo4.queue().isEmpty()) {
Notification<T1> n1 = jo1.queue().peek();
Notification<T2> n2 = jo2.queue().peek();
Notification<T3> n3 = jo3.queue().peek();
Notification<T4> n4 = jo4.queue().peek();
if (n1.isOnComplete()
|| n2.isOnComplete()
|| n3.isOnComplete()
|| n4.isOnComplete()) {
onCompleted.run();
} else {
dequeue();
onNext.accept(n1.getValue(), n2.getValue(), n3.getValue(), n4.getValue());
}
}
}
项目:RxJava2Extensions
文件:ActivePlan3.java
@Override
protected void match() throws Exception {
if (!first.queue().isEmpty()
&& !second.queue().isEmpty()
&& !third.queue().isEmpty()) {
Notification<T1> n1 = first.queue().peek();
Notification<T2> n2 = second.queue().peek();
Notification<T3> n3 = third.queue().peek();
if (n1.isOnComplete() || n2.isOnComplete() || n3.isOnComplete()) {
onCompleted.run();
} else {
dequeue();
onNext.accept(n1.getValue(), n2.getValue(), n3.getValue());
}
}
}
项目:Java-EX
文件:RxIterator.java
private void calcNext() {
if (completed) {
return;
}
if (next == null) {
Notification<T> take = uncheck(queue::take);
if (take.isOnNext()) {
next = take;
} else if (take.isOnError()) {
completed = true;
throw new RuntimeException(take.getError());
} else {
completed = true;
}
}
}
项目:Java-EX
文件:RxIterator.java
private void calcNext() {
if (completed) {
return;
}
if (next == null) {
subscription.request(1);
Notification<T> take = uncheck(() -> queue.take());
if (take.isOnNext()) {
next = take;
} else if (take.isOnError()) {
completed = true;
throw new RuntimeException(take.getError());
} else {
completed = true;
}
}
}
项目:RxWindowIfChanged
文件:WindowIfChangedTest.java
@Test public void completeCompletesInner() {
Observable<Message> messages = Observable.just(new Message("Bob", "Hello"));
final AtomicInteger seen = new AtomicInteger();
WindowIfChanged.create(messages, userSelector)
.switchMap(
new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
@Override public Observable<Notification<String>> apply(
GroupedObservable<String, Message> group) {
final int count = seen.incrementAndGet();
return group.map(new Function<Message, String>() {
@Override public String apply(Message message) throws Exception {
return count + " " + message;
}
}).materialize();
}
})
.test()
.assertValues( //
Notification.createOnNext("1 Bob Hello"), //
Notification.<String>createOnComplete()) //
.assertComplete();
}
项目:RxWindowIfChanged
文件:WindowIfChangedTest.java
@Test public void errorCompletesInner() {
RuntimeException error = new RuntimeException("boom!");
Observable<Message> messages = Observable.just( //
Notification.createOnNext(new Message("Bob", "Hello")),
Notification.createOnError(error)
).dematerialize();
final AtomicInteger seen = new AtomicInteger();
WindowIfChanged.create(messages, userSelector)
.switchMap(
new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
@Override public Observable<Notification<String>> apply(
GroupedObservable<String, Message> group) {
final int count = seen.incrementAndGet();
return group.map(new Function<Message, String>() {
@Override public String apply(Message message) throws Exception {
return count + " " + message;
}
}).materialize();
}
})
.test()
.assertValues( //
Notification.createOnNext("1 Bob Hello"), //
Notification.<String>createOnComplete()) //
.assertError(error);
}
项目:GitHub
文件:RecordingObserver.java
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
项目:GitHub
文件:RecordingObserver.java
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
项目:GitHub
文件:RecordingObserver.java
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
项目:GitHub
文件:RecordingObserver.java
public void assertComplete() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnComplete())
.as("Expected onCompleted event but was " + notification)
.isTrue();
assertNoEvents();
}
项目:GitHub
文件:RecordingSubscriber.java
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
项目:GitHub
文件:RecordingSubscriber.java
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
项目:GitHub
文件:RecordingSubscriber.java
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
项目:GitHub
文件:RecordingSubscriber.java
public void assertComplete() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnComplete())
.as("Expected onCompleted event but was " + notification)
.isTrue();
assertNoEvents();
}
项目:GitHub
文件:RecordingCompletableObserver.java
private Notification<?> takeNotification() {
Notification<?> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
项目:GitHub
文件:RecordingCompletableObserver.java
public Throwable takeError() {
Notification<?> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
项目:GitHub
文件:RecordingCompletableObserver.java
public void assertComplete() {
Notification<?> notification = takeNotification();
assertThat(notification.isOnComplete())
.as("Expected onCompleted event but was " + notification)
.isTrue();
assertNoEvents();
}
项目:GitHub
文件:RecordingSingleObserver.java
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
项目:GitHub
文件:RecordingSingleObserver.java
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
项目:GitHub
文件:RecordingSingleObserver.java
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
项目:GitHub
文件:RecordingMaybeObserver.java
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
项目:GitHub
文件:RecordingMaybeObserver.java
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
项目:GitHub
文件:RecordingMaybeObserver.java
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
项目:GitHub
文件:RecordingObserver.java
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
项目:GitHub
文件:RecordingObserver.java
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
项目:GitHub
文件:RecordingObserver.java
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
项目:GitHub
文件:RecordingObserver.java
public void assertComplete() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnComplete())
.as("Expected onCompleted event but was " + notification)
.isTrue();
assertNoEvents();
}
项目:GitHub
文件:RecordingSubscriber.java
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
项目:GitHub
文件:RecordingSubscriber.java
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
项目:GitHub
文件:RecordingSubscriber.java
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
项目:GitHub
文件:RecordingSubscriber.java
public void assertComplete() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnComplete())
.as("Expected onCompleted event but was " + notification)
.isTrue();
assertNoEvents();
}