@Override public void subscribe(Subscriber<? super T> s) { subscriber = s; subscriber.onSubscribe(new Subscription() { @Override public void request(long count) { requested = true; if (item != null) { publish(item); } } @Override public void cancel() { subscriber = null; } }); if (error != null) { publish(error); } }
default CompletableSubscriber<T> andThen(Runnable runnable) { CompletableSubscriber<T> thisSubscriber = this; return new AbstractCompletableSubscriber<T>() { @Override public void onSubscribe(Subscription subscription) { thisSubscriber.onSubscribe(subscription); } @Override public void onNext(T item) { thisSubscriber.onNext(item); } @Override public void onComplete() { runnable.run(); } @Override public void onError(Throwable error) { thisSubscriber.onError(error); } }; }
default CompletableSubscriber<T> exceptionally(Consumer<Throwable> consumer) { CompletableSubscriber<T> thisSubscriber = this; return new AbstractCompletableSubscriber<T>() { @Override public void onSubscribe(Subscription subscription) { thisSubscriber.onSubscribe(subscription); } @Override public void onNext(T item) { thisSubscriber.onNext(item); } @Override public void onComplete() { thisSubscriber.onComplete(); } @Override public void onError(Throwable error) { consumer.accept(error); } }; }
public static <T> CompletableSubscriber<T> pushEach(Consumer<T> consumer) { return new AbstractCompletableSubscriber<T>() { @Override public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); } @Override public void onNext(T item) { consumer.accept(item); } }; }
public static <T> CompletableSubscriber<T> pullEach(BiConsumer<T, Subscription> consumer) { return new AbstractCompletableSubscriber<T>() { @Override public void onSubscribe(Subscription subscription) { super.onSubscribe(subscription); subscription.request(1); } @Override public void onNext(T item) { consumer.accept(item, subscription); } }; }
@Override public void subscribe(Subscriber<? super byte[]> subscriber) { byte[] buffer = new byte[1024]; subscriber.onSubscribe(new Subscription() { @Override public void request(long count) { try { for (long i = 0; i < count; i++) { int read = stream.read(buffer); byte[] item = buffer; if (read != buffer.length) { item = new byte[read]; System.arraycopy(buffer, 0, item, 0, read); } subscriber.onNext(item); } } catch (IOException e) { subscriber.onError(e); } } @Override public void cancel() { } }); }
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException { AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE); AtomicLong offset = new AtomicLong(0); AtomicInteger resultCount = new AtomicInteger(0); SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>(); Semaphore writeSemaphore = new Semaphore(1); writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher, andThen((count, s) -> { writeSemaphore.release(); customers.subscribe(pullEach((Customer customer, Subscription subscription) -> { String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",", customer.getFirstName(), customer.getLastName()); offset.addAndGet(count); writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher, andThen((size, c) -> { writeSemaphore.release(); offset.addAndGet(size); resultCount.incrementAndGet(); subscription.request(1); })); }).andThen(() -> { writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher, andThen((d, e) -> { writeSemaphore.release(); try { fileChannel.close(); resultPublisher.publish(resultCount.intValue()); } catch (IOException error) { resultPublisher.publish(error); } })); }).exceptionally(error -> resultPublisher.publish(error))); })); return resultPublisher; }
@Override public void onSubscribe(Flow.Subscription subscription) { if (this.subscription != null) { throw new IllegalStateException(); } this.subscription = subscription; subscription.request(1); }
@Override public void onSubscribe(Subscription subscription) { log("Subscribed..."); this.subscription = subscription; this.buffer = new AtomicInteger(); requestItems(); }
@Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; }
@Override public void onSubscribe(Subscription subscription) { (this.subscription = subscription).request(1); }
@Override public void onSubscribe(Subscription subscription) { // Request an unbounded number of items subscription.request(Long.MAX_VALUE); // Long.MAX_VALUE is considered as unbounded }
@Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; System.out.printf(Thread.currentThread().getName()+" subscribed with max count %d\n", maxCount); subscription.request(maxCount); }
@Override public void onSubscribe(Flow.Subscription subscription) { System.out.printf("%s: Consumer 2: Subscription received\n", Thread.currentThread().getName()); this.subscription=subscription; subscription.request(1); }
@Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); System.out.printf("%s: Consumer - Subscription\n",Thread.currentThread().getName()); }
@Override public final void onSubscribe(final Subscription subscription) { this.subscription = subscription; subscription.request(1); }