Java 类java.util.concurrent.Flow.Subscription 实例源码

项目:reactive-jax-rs    文件:SingleItemPublisher.java   
@Override
public void subscribe(Subscriber<? super T> s) {
  subscriber = s;
  subscriber.onSubscribe(new Subscription() {

    @Override
    public void request(long count) {
      requested = true;
      if (item != null) {
        publish(item);
      }
    }

    @Override
    public void cancel() {
      subscriber = null;
    }
  });
  if (error != null) {
    publish(error);
  }
}
项目:reactive-jax-rs    文件:CompletableSubscriber.java   
default CompletableSubscriber<T> andThen(Runnable runnable) {
  CompletableSubscriber<T> thisSubscriber = this;
  return new AbstractCompletableSubscriber<T>() {

    @Override
    public void onSubscribe(Subscription subscription) {
      thisSubscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(T item) {
      thisSubscriber.onNext(item);
    }

    @Override
    public void onComplete() {
      runnable.run();
    }

    @Override
    public void onError(Throwable error) {
      thisSubscriber.onError(error);
    }
  };
}
项目:reactive-jax-rs    文件:CompletableSubscriber.java   
default CompletableSubscriber<T> exceptionally(Consumer<Throwable> consumer) {
  CompletableSubscriber<T> thisSubscriber = this;
  return new AbstractCompletableSubscriber<T>() {

    @Override
    public void onSubscribe(Subscription subscription) {
      thisSubscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(T item) {
      thisSubscriber.onNext(item);
    }

    @Override
    public void onComplete() {
      thisSubscriber.onComplete();
    }

    @Override
    public void onError(Throwable error) {
      consumer.accept(error);
    }
  };
}
项目:reactive-jax-rs    文件:CompletableSubscriber.java   
public static <T> CompletableSubscriber<T> pushEach(Consumer<T> consumer) {
  return new AbstractCompletableSubscriber<T>() {

    @Override
    public void onSubscribe(Subscription subscription) {
      subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T item) {
      consumer.accept(item);
    }
  };
}
项目:reactive-jax-rs    文件:CompletableSubscriber.java   
public static <T> CompletableSubscriber<T> pullEach(BiConsumer<T, Subscription> consumer) {
  return new AbstractCompletableSubscriber<T>() {

    @Override
    public void onSubscribe(Subscription subscription) {
      super.onSubscribe(subscription);
      subscription.request(1);
    }

    @Override
    public void onNext(T item) {
      consumer.accept(item, subscription);
    }
  };
}
项目:reactive-jax-rs    文件:InputStreamPublisher.java   
@Override
public void subscribe(Subscriber<? super byte[]> subscriber) {
  byte[] buffer = new byte[1024];
  subscriber.onSubscribe(new Subscription() {

    @Override
    public void request(long count) {

      try {
        for (long i = 0; i < count; i++) {
          int read = stream.read(buffer);
          byte[] item = buffer;
          if (read != buffer.length) {
            item = new byte[read];
            System.arraycopy(buffer, 0, item, 0, read);
          }
          subscriber.onNext(item);
        }
      } catch (IOException e) {
        subscriber.onError(e);
      }
    }

    @Override
    public void cancel() {
    }
  });
}
项目:reactive-jax-rs    文件:CustomerRepository.java   
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException {
  AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
  AtomicLong offset = new AtomicLong(0);
  AtomicInteger resultCount = new AtomicInteger(0);
  SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>();
  Semaphore writeSemaphore = new Semaphore(1);
  writeSemaphore.acquireUninterruptibly();
  fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher,
      andThen((count, s) -> {
        writeSemaphore.release();
        customers.subscribe(pullEach((Customer customer, Subscription subscription) -> {
            String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",",
                customer.getFirstName(), customer.getLastName());
            offset.addAndGet(count);
            writeSemaphore.acquireUninterruptibly();
            fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher,
                andThen((size, c) -> {
                  writeSemaphore.release();
                  offset.addAndGet(size);
                  resultCount.incrementAndGet();
                  subscription.request(1);
                }));
          }).andThen(() -> {
            writeSemaphore.acquireUninterruptibly();
              fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher,
                  andThen((d, e) -> {
                    writeSemaphore.release();
                    try {
                      fileChannel.close();
                      resultPublisher.publish(resultCount.intValue());
                    } catch (IOException error) {
                      resultPublisher.publish(error);
                    }
                  }));
          }).exceptionally(error -> resultPublisher.publish(error)));
      }));
  return resultPublisher;
}
项目:openjdk-jdk10    文件:Stream.java   
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription != null) {
        throw new IllegalStateException();
    }
    this.subscription = subscription;
    subscription.request(1);
}
项目:demo-java-9    文件:LoggingRandomDelaySubscriber.java   
@Override
public void onSubscribe(Subscription subscription) {
    log("Subscribed...");
    this.subscription = subscription;
    this.buffer = new AtomicInteger();
    requestItems();
}
项目:reactive-jax-rs    文件:CompletableSubscriber.java   
@Override
public void onSubscribe(Subscription subscription) {
  this.subscription = subscription;
}
项目:Reactive-Programming-With-Java-9    文件:NumberSubscriber.java   
@Override
public void onSubscribe(Subscription subscription) {
    (this.subscription = subscription).request(1);
}
项目:Reactive-Programming-With-Java-9    文件:WelcomeProcessor.java   
@Override
public void onSubscribe(Subscription subscription) {
    // Request an unbounded number of items
    subscription.request(Long.MAX_VALUE);
    // Long.MAX_VALUE is considered as unbounded 
}
项目:Reactive-Programming-With-Java-9    文件:WelcomeSubscriber.java   
@Override
public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    System.out.printf(Thread.currentThread().getName()+" subscribed with max count %d\n", maxCount);
    subscription.request(maxCount);
}
项目:Reactive-Programming-With-Java-9    文件:NumberSubscriber.java   
@Override
public void onSubscribe(Subscription subscription) {
    (this.subscription = subscription).request(1);
}
项目:Java-9-Concurrency-Cookbook-Second-Edition    文件:Consumer2.java   
@Override
public void onSubscribe(Flow.Subscription subscription) {
    System.out.printf("%s: Consumer 2: Subscription received\n", Thread.currentThread().getName());
    this.subscription=subscription;
    subscription.request(1);
}
项目:Java-9-Concurrency-Cookbook-Second-Edition    文件:Consumer.java   
@Override
public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
    System.out.printf("%s: Consumer - Subscription\n",Thread.currentThread().getName());
}
项目:java-feature-set    文件:FlowAPITest.java   
@Override
public final void onSubscribe(final Subscription subscription) {
  this.subscription = subscription;
  subscription.request(1);
}