private void calcNext() { if (completed) { return; } if (next == null) { Notification<T> take = uncheck(queue::take); if (take.isOnNext()) { next = take; } else if (take.isOnError()) { completed = true; throw new RuntimeException(take.getError()); } else { completed = true; } } }
private void calcNext() { if (completed) { return; } if (next == null) { subscription.request(1); Notification<T> take = uncheck(() -> queue.take()); if (take.isOnNext()) { next = take; } else if (take.isOnError()) { completed = true; throw new RuntimeException(take.getError()); } else { completed = true; } } }
private static <T> Flowable<Tx<T>> inTransaction(CallableBuilder b, Function<Single<Connection>, Flowable<Notification<T>>> f) { return Flowable.defer(() -> { AtomicReference<Connection> con = new AtomicReference<Connection>(); // set the atomic reference when transactedConnection emits Single<Connection> transactedConnection = b.connection // .map(c -> Util.toTransactedConnection(con, c)); return f.apply(transactedConnection) // .<Tx<T>>flatMap(n -> Tx.toTx(n, con.get(), b.db)) // .doOnNext(tx -> { if (tx.isComplete()) { ((TxImpl<T>) tx).connection().commit(); } }); }); }
static Flowable<Notification<TupleN<Object>>> createWithNParameters( // Single<Connection> connection, // String sql, // Flowable<List<Object>> parameterGroups, // List<ParameterPlaceholder> parameterPlaceholders, // List<Class<?>> outClasses) { return connection // .toFlowable() // .flatMap( // con -> createWithParameters( // con, // sql, // parameterGroups, // parameterPlaceholders, // (stmt, parameters) -> createWithNParameters(stmt, parameters, parameterPlaceholders, outClasses))); }
private static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Connection con, String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1, int fetchSize) { log.debug("Update.create {}", sql); Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders); final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet1<T1>>>> flowableFactory = // stmt -> parameterGroups // .flatMap(parameters -> { List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt, parameters); Flowable<T1> flowable1 = createFlowable(stmt, f1); return Single.just(new CallableResultSet1<T1>(outputValues, flowable1)).toFlowable(); }) // .materialize() // .doOnComplete(() -> Util.commit(stmt.stmt)) // .doOnError(e -> Util.rollback(stmt.stmt)); Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection; return Flowable.using(resourceFactory, flowableFactory, disposer, true); }
private static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(Connection con, String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1, Function<? super ResultSet, ? extends T2> f2, int fetchSize) { Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders); final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet2<T1, T2>>>> flowableFactory = // stmt -> parameterGroups // .flatMap(parameters -> { List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt, parameters); final Flowable<T1> flowable1 = createFlowable(stmt, f1); stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT); final Flowable<T2> flowable2 = createFlowable(stmt, f2); return Single.just(new CallableResultSet2<T1, T2>(outputValues, flowable1, flowable2)) .toFlowable(); }) // .materialize() // .doOnComplete(() -> Util.commit(stmt.stmt)) // .doOnError(e -> Util.rollback(stmt.stmt)); Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection; return Flowable.using(resourceFactory, flowableFactory, disposer, true); }
private static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets( Connection con, String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1, Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) { Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders); final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet3<T1, T2, T3>>>> flowableFactory = // stmt -> parameterGroups // .flatMap(parameters -> { List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt, parameters); final Flowable<T1> flowable1 = createFlowable(stmt, f1); stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT); final Flowable<T2> flowable2 = createFlowable(stmt, f2); stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT); final Flowable<T3> flowable3 = createFlowable(stmt, f3); return Single.just( new CallableResultSet3<T1, T2, T3>(outputValues, flowable1, flowable2, flowable3)) .toFlowable(); }) // .materialize() // .doOnComplete(() -> Util.commit(stmt.stmt)) // .doOnError(e -> Util.rollback(stmt.stmt)); Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection; return Flowable.using(resourceFactory, flowableFactory, disposer, true); }
private static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Connection con, String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders, List<Function<? super ResultSet, ?>> functions, int fetchSize) { Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders); final Function<NamedCallableStatement, Flowable<Notification<CallableResultSetN>>> flowableFactory = // stmt -> parameterGroups // .flatMap(parameters -> { List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt, parameters); List<Flowable<?>> flowables = Lists.newArrayList(); int i = 0; do { Function<? super ResultSet, ?> f = functions.get(i); flowables.add(createFlowable(stmt, f)); i++; } while (stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT)); return Single.just(new CallableResultSetN(outputValues, flowables)).toFlowable(); }) // .materialize() // .doOnComplete(() -> Util.commit(stmt.stmt)) // .doOnError(e -> Util.rollback(stmt.stmt)); Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection; return Flowable.using(resourceFactory, flowableFactory, disposer, true); }
@Override protected void match() throws Exception { Object[] notifications = new Object[this.observers.size()]; int j = 0; int completedCount = 0; for (JoinObserver1<? extends Object> jo : this.observers) { if (jo.queue().isEmpty()) { return; } Notification<? extends Object> n = jo.queue().peek(); if (n.isOnComplete()) { completedCount++; } notifications[j] = n.getValue(); j++; } if (completedCount == j) { onCompleted.run(); } else { dequeue(); onNext.accept(notifications); } }
@Override protected void match() throws Exception { if (!jo1.queue().isEmpty() && !jo2.queue().isEmpty() && !jo3.queue().isEmpty() && !jo4.queue().isEmpty()) { Notification<T1> n1 = jo1.queue().peek(); Notification<T2> n2 = jo2.queue().peek(); Notification<T3> n3 = jo3.queue().peek(); Notification<T4> n4 = jo4.queue().peek(); if (n1.isOnComplete() || n2.isOnComplete() || n3.isOnComplete() || n4.isOnComplete()) { onCompleted.run(); } else { dequeue(); onNext.accept(n1.getValue(), n2.getValue(), n3.getValue(), n4.getValue()); } } }
@Override protected void match() throws Exception { if (!first.queue().isEmpty() && !second.queue().isEmpty() && !third.queue().isEmpty()) { Notification<T1> n1 = first.queue().peek(); Notification<T2> n2 = second.queue().peek(); Notification<T3> n3 = third.queue().peek(); if (n1.isOnComplete() || n2.isOnComplete() || n3.isOnComplete()) { onCompleted.run(); } else { dequeue(); onNext.accept(n1.getValue(), n2.getValue(), n3.getValue()); } } }
@Test public void completeCompletesInner() { Observable<Message> messages = Observable.just(new Message("Bob", "Hello")); final AtomicInteger seen = new AtomicInteger(); WindowIfChanged.create(messages, userSelector) .switchMap( new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() { @Override public Observable<Notification<String>> apply( GroupedObservable<String, Message> group) { final int count = seen.incrementAndGet(); return group.map(new Function<Message, String>() { @Override public String apply(Message message) throws Exception { return count + " " + message; } }).materialize(); } }) .test() .assertValues( // Notification.createOnNext("1 Bob Hello"), // Notification.<String>createOnComplete()) // .assertComplete(); }
@Test public void errorCompletesInner() { RuntimeException error = new RuntimeException("boom!"); Observable<Message> messages = Observable.just( // Notification.createOnNext(new Message("Bob", "Hello")), Notification.createOnError(error) ).dematerialize(); final AtomicInteger seen = new AtomicInteger(); WindowIfChanged.create(messages, userSelector) .switchMap( new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() { @Override public Observable<Notification<String>> apply( GroupedObservable<String, Message> group) { final int count = seen.incrementAndGet(); return group.map(new Function<Message, String>() { @Override public String apply(Message message) throws Exception { return count + " " + message; } }).materialize(); } }) .test() .assertValues( // Notification.createOnNext("1 Bob Hello"), // Notification.<String>createOnComplete()) // .assertError(error); }
private Notification<T> takeNotification() { Notification<T> notification = events.pollFirst(); if (notification == null) { throw new AssertionError("No event found!"); } return notification; }
public T takeValue() { Notification<T> notification = takeNotification(); assertThat(notification.isOnNext()) .as("Expected onNext event but was " + notification) .isTrue(); return notification.getValue(); }
public Throwable takeError() { Notification<T> notification = takeNotification(); assertThat(notification.isOnError()) .as("Expected onError event but was " + notification) .isTrue(); return notification.getError(); }
public void assertComplete() { Notification<T> notification = takeNotification(); assertThat(notification.isOnComplete()) .as("Expected onCompleted event but was " + notification) .isTrue(); assertNoEvents(); }
private Notification<?> takeNotification() { Notification<?> notification = events.pollFirst(); if (notification == null) { throw new AssertionError("No event found!"); } return notification; }
public Throwable takeError() { Notification<?> notification = takeNotification(); assertThat(notification.isOnError()) .as("Expected onError event but was " + notification) .isTrue(); return notification.getError(); }
public void assertComplete() { Notification<?> notification = takeNotification(); assertThat(notification.isOnComplete()) .as("Expected onCompleted event but was " + notification) .isTrue(); assertNoEvents(); }