public Publisher<Customer> findAllAsync() throws IOException { String customers = IOUtils.toString(new FileReader("customers.json")).trim(); final String customerData = customers.substring(1, customers.length() - 1); return new Publisher<Customer>() { @Override public void subscribe(Subscriber<? super Customer> subscriber) { asList(customerData.split("\\{")) .stream() .filter(s -> !s.isEmpty()) .map(c -> c.substring(0, c.lastIndexOf('}'))) .map(c -> c.split(",")) .map(c -> { String firstName = c[0].substring(c[0].indexOf(':') + 3, c[0].length() - 1); String lastName = c[1].substring(c[1].indexOf(':') + 3, c[1].length() - 1); return new Customer(firstName, lastName); }) .forEach(subscriber::onNext); subscriber.onComplete(); } }; }
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException { AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE); AtomicLong offset = new AtomicLong(0); AtomicInteger resultCount = new AtomicInteger(0); SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>(); Semaphore writeSemaphore = new Semaphore(1); writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher, andThen((count, s) -> { writeSemaphore.release(); customers.subscribe(pullEach((Customer customer, Subscription subscription) -> { String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",", customer.getFirstName(), customer.getLastName()); offset.addAndGet(count); writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher, andThen((size, c) -> { writeSemaphore.release(); offset.addAndGet(size); resultCount.incrementAndGet(); subscription.request(1); })); }).andThen(() -> { writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher, andThen((d, e) -> { writeSemaphore.release(); try { fileChannel.close(); resultPublisher.publish(resultCount.intValue()); } catch (IOException error) { resultPublisher.publish(error); } })); }).exceptionally(error -> resultPublisher.publish(error))); })); return resultPublisher; }
public static void createAndSubscribe(String name, Publisher<?> publisher) { publisher.subscribe(new LoggingRandomDelaySubscriber(name)); }
public static void main(String[] args) { long start_range=10, stop_range=22; Publisher<Long> publisher = new NumberPublisher(start_range,stop_range); // Register Subscriber int count=10; NumberSubscriber subscriber = new NumberSubscriber(count); publisher.subscribe(subscriber); }
public static void main(String[] args) { long start_range=10, stop_range=22; Publisher<Long> publisher = new NumberPublisher(start_range,stop_range); // Register Subscriber int count=5; NumberSubscriber subscriber = new NumberSubscriber(count); publisher.subscribe(subscriber); }