Java 类java.util.concurrent.Flow.Publisher 实例源码

项目:reactive-jax-rs    文件:CustomerRepository.java   
public Publisher<Customer> findAllAsync() throws IOException {
  String customers = IOUtils.toString(new FileReader("customers.json")).trim();
  final String customerData = customers.substring(1, customers.length() - 1);

  return new Publisher<Customer>() {
    @Override
    public void subscribe(Subscriber<? super Customer> subscriber) {
      asList(customerData.split("\\{"))
        .stream()
        .filter(s -> !s.isEmpty())
        .map(c -> c.substring(0, c.lastIndexOf('}')))
        .map(c -> c.split(","))
        .map(c -> {
          String firstName = c[0].substring(c[0].indexOf(':') + 3, c[0].length() - 1);
          String lastName = c[1].substring(c[1].indexOf(':') + 3, c[1].length() - 1);
          return new Customer(firstName, lastName);
        })
        .forEach(subscriber::onNext);
      subscriber.onComplete();
    }
  };
}
项目:reactive-jax-rs    文件:CustomerRepository.java   
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException {
  AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
  AtomicLong offset = new AtomicLong(0);
  AtomicInteger resultCount = new AtomicInteger(0);
  SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>();
  Semaphore writeSemaphore = new Semaphore(1);
  writeSemaphore.acquireUninterruptibly();
  fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher,
      andThen((count, s) -> {
        writeSemaphore.release();
        customers.subscribe(pullEach((Customer customer, Subscription subscription) -> {
            String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",",
                customer.getFirstName(), customer.getLastName());
            offset.addAndGet(count);
            writeSemaphore.acquireUninterruptibly();
            fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher,
                andThen((size, c) -> {
                  writeSemaphore.release();
                  offset.addAndGet(size);
                  resultCount.incrementAndGet();
                  subscription.request(1);
                }));
          }).andThen(() -> {
            writeSemaphore.acquireUninterruptibly();
              fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher,
                  andThen((d, e) -> {
                    writeSemaphore.release();
                    try {
                      fileChannel.close();
                      resultPublisher.publish(resultCount.intValue());
                    } catch (IOException error) {
                      resultPublisher.publish(error);
                    }
                  }));
          }).exceptionally(error -> resultPublisher.publish(error)));
      }));
  return resultPublisher;
}
项目:demo-java-9    文件:LoggingRandomDelaySubscriber.java   
public static void createAndSubscribe(String name, Publisher<?> publisher) {
    publisher.subscribe(new LoggingRandomDelaySubscriber(name));
}
项目:Reactive-Programming-With-Java-9    文件:Main_NumberPublisher.java   
public static void main(String[] args) {

        long start_range=10, stop_range=22;
        Publisher<Long> publisher = new NumberPublisher(start_range,stop_range);

        // Register Subscriber
        int count=10;
        NumberSubscriber subscriber = new NumberSubscriber(count);
        publisher.subscribe(subscriber);



    }
项目:Reactive-Programming-With-Java-9    文件:Main_NumberPublisher.java   
public static void main(String[] args) {

        long start_range=10, stop_range=22;
        Publisher<Long> publisher = new NumberPublisher(start_range,stop_range);

        // Register Subscriber
        int count=5;
        NumberSubscriber subscriber = new NumberSubscriber(count);
        publisher.subscribe(subscriber);



    }