private static <T> Flowable<T> create(NamedPreparedStatement ps, List<Object> parameters, Function<? super ResultSet, T> mapper) { Callable<ResultSet> initialState = () -> { Util.convertAndSetParameters(ps.ps, parameters, ps.names); ps.ps.execute(); return ps.ps.getGeneratedKeys(); }; BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> { if (rs.next()) { emitter.onNext(mapper.apply(rs)); } else { emitter.onComplete(); } }; Consumer<ResultSet> disposer = Util::closeSilently; return Flowable.generate(initialState, generator, disposer); }
private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt, Function<? super ResultSet, ? extends T> f) throws SQLException { ResultSet rsActual = stmt.stmt.getResultSet(); Callable<ResultSet> initialState = () -> rsActual; BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> { log.debug("getting row from ps={}, rs={}", stmt.stmt, rs); if (rs.next()) { T v = f.apply(rs); log.debug("emitting {}", v); emitter.onNext(v); } else { log.debug("completed"); emitter.onComplete(); } }; Consumer<ResultSet> disposeState = Util::closeSilently; return Flowable.generate(initialState, generator, disposeState); }
/** * Returns a Flowable stream of byte arrays from the given * {@link InputStream} between 1 and {@code bufferSize} bytes. * * @param is * input stream of bytes * @param bufferSize * max emitted byte array size * @return a stream of byte arrays */ public static Flowable<byte[]> from(final InputStream is, final int bufferSize) { return Flowable.generate(new Consumer<Emitter<byte[]>>() { @Override public void accept(Emitter<byte[]> emitter) throws Exception { byte[] buffer = new byte[bufferSize]; int count = is.read(buffer); if (count == -1) { emitter.onComplete(); } else if (count < bufferSize) { emitter.onNext(Arrays.copyOf(buffer, count)); } else { emitter.onNext(buffer); } } }); }
public static Flowable<ZippedEntry> unzip(final ZipInputStream zis) { return Flowable.generate(new Consumer<Emitter<ZippedEntry>>() { @Override public void accept(Emitter<ZippedEntry> emitter) throws IOException { ZipEntry zipEntry = zis.getNextEntry(); if (zipEntry != null) { emitter.onNext(new ZippedEntry(zipEntry, zis)); } else { // end of stream so eagerly close the stream (might not be a // good idea since this method did not create the zis zis.close(); emitter.onComplete(); } } }); }
public <T> Flowable<T> read(final Class<T> cls, final Input input) { return Flowable.generate(new Consumer<Emitter<T>>() { @Override public void accept(Emitter<T> emitter) throws Exception { if (input.eof()) { emitter.onComplete(); } else { T t = kryo.readObject(input, cls); emitter.onNext(t); } } }); }
@Override public void accept(State state, Emitter<SqsMessage> emitter) throws Exception { final Queue<Message> q = state.queue; Optional<SqsMessage> next = Optional.empty(); while (!next.isPresent()) { while (q.isEmpty()) { final ReceiveMessageResult result = sqs.receiveMessage(request); q.addAll(result.getMessages()); } final Message message = q.poll(); next = getNextMessage(message, queueUrl, bucketName, s3, sqs, service); } emitter.onNext(next.get()); }
private static Flowable<Flowable<byte[]>> createServerSocketFlowable(final ServerSocket serverSocket, final long timeoutMs, final int bufferSize, final Action preAcceptAction, final Predicate<? super Socket> acceptSocket) { return Flowable.generate( // new Consumer<Emitter<Flowable<byte[]>>>() { @Override public void accept(Emitter<Flowable<byte[]>> emitter) throws Exception { acceptConnection(timeoutMs, bufferSize, serverSocket, emitter, preAcceptAction, acceptSocket); } }); }
public static Flowable<String> from(final Reader reader, final int bufferSize) { return Flowable.generate(new Consumer<Emitter<String>>() { final char[] buffer = new char[bufferSize]; @Override public void accept(Emitter<String> emitter) throws Exception { int count = reader.read(buffer); if (count == -1) { emitter.onComplete(); } else { emitter.onNext(String.valueOf(buffer, 0, count)); } } }); }
private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Object> parameters, Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize) { log.debug("parameters={}", parameters); log.debug("names={}", names); Callable<ResultSet> initialState = () -> { List<Parameter> params = Util.toParameters(parameters); boolean hasCollection = params.stream().anyMatch(x -> x.isCollection()); final PreparedStatement ps2; if (hasCollection) { // create a new prepared statement with the collection ? substituted with // ?s to match the size of the collection parameter ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params); // now wrap the rs to auto close ps2 because it is single use (the next // collection parameter may have a different ordinality so we need to build // a new PreparedStatement with a different number of question marks // substituted return new ResultSetAutoClosesStatement(Util // .setParameters(ps2, params, names) // .executeQuery(), ps2); } else { // use the current prepared statement (normal re-use) ps2 = ps; return Util // .setParameters(ps2, params, names) // .executeQuery(); } }; BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> { log.debug("getting row from ps={}, rs={}", rs.getStatement(), rs); if (rs.next()) { T v = mapper.apply(rs); log.debug("emitting {}", v); emitter.onNext(v); } else { log.debug("completed"); emitter.onComplete(); } }; Consumer<ResultSet> disposeState = Util::closeSilently; return Flowable.generate(initialState, generator, disposeState); }
public ObserverWrapper(Emitter<T> emitter) { mEmitter = emitter; }
void execute(Realm realm, Emitter<T> emitter);