Java 类java.util.concurrent.Flow.Subscription 实例源码
项目:reactive-jax-rs
文件:SingleItemPublisher.java
@Override
public void subscribe(Subscriber<? super T> s) {
subscriber = s;
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long count) {
requested = true;
if (item != null) {
publish(item);
}
}
@Override
public void cancel() {
subscriber = null;
}
});
if (error != null) {
publish(error);
}
}
项目:reactive-jax-rs
文件:CompletableSubscriber.java
default CompletableSubscriber<T> andThen(Runnable runnable) {
CompletableSubscriber<T> thisSubscriber = this;
return new AbstractCompletableSubscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
thisSubscriber.onSubscribe(subscription);
}
@Override
public void onNext(T item) {
thisSubscriber.onNext(item);
}
@Override
public void onComplete() {
runnable.run();
}
@Override
public void onError(Throwable error) {
thisSubscriber.onError(error);
}
};
}
项目:reactive-jax-rs
文件:CompletableSubscriber.java
default CompletableSubscriber<T> exceptionally(Consumer<Throwable> consumer) {
CompletableSubscriber<T> thisSubscriber = this;
return new AbstractCompletableSubscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
thisSubscriber.onSubscribe(subscription);
}
@Override
public void onNext(T item) {
thisSubscriber.onNext(item);
}
@Override
public void onComplete() {
thisSubscriber.onComplete();
}
@Override
public void onError(Throwable error) {
consumer.accept(error);
}
};
}
项目:reactive-jax-rs
文件:CompletableSubscriber.java
public static <T> CompletableSubscriber<T> pushEach(Consumer<T> consumer) {
return new AbstractCompletableSubscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(T item) {
consumer.accept(item);
}
};
}
项目:reactive-jax-rs
文件:CompletableSubscriber.java
public static <T> CompletableSubscriber<T> pullEach(BiConsumer<T, Subscription> consumer) {
return new AbstractCompletableSubscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(subscription);
subscription.request(1);
}
@Override
public void onNext(T item) {
consumer.accept(item, subscription);
}
};
}
项目:reactive-jax-rs
文件:InputStreamPublisher.java
@Override
public void subscribe(Subscriber<? super byte[]> subscriber) {
byte[] buffer = new byte[1024];
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long count) {
try {
for (long i = 0; i < count; i++) {
int read = stream.read(buffer);
byte[] item = buffer;
if (read != buffer.length) {
item = new byte[read];
System.arraycopy(buffer, 0, item, 0, read);
}
subscriber.onNext(item);
}
} catch (IOException e) {
subscriber.onError(e);
}
}
@Override
public void cancel() {
}
});
}
项目:reactive-jax-rs
文件:CustomerRepository.java
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException {
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
AtomicLong offset = new AtomicLong(0);
AtomicInteger resultCount = new AtomicInteger(0);
SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>();
Semaphore writeSemaphore = new Semaphore(1);
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher,
andThen((count, s) -> {
writeSemaphore.release();
customers.subscribe(pullEach((Customer customer, Subscription subscription) -> {
String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",",
customer.getFirstName(), customer.getLastName());
offset.addAndGet(count);
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher,
andThen((size, c) -> {
writeSemaphore.release();
offset.addAndGet(size);
resultCount.incrementAndGet();
subscription.request(1);
}));
}).andThen(() -> {
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher,
andThen((d, e) -> {
writeSemaphore.release();
try {
fileChannel.close();
resultPublisher.publish(resultCount.intValue());
} catch (IOException error) {
resultPublisher.publish(error);
}
}));
}).exceptionally(error -> resultPublisher.publish(error)));
}));
return resultPublisher;
}
项目:openjdk-jdk10
文件:Stream.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException();
}
this.subscription = subscription;
subscription.request(1);
}
项目:demo-java-9
文件:LoggingRandomDelaySubscriber.java
@Override
public void onSubscribe(Subscription subscription) {
log("Subscribed...");
this.subscription = subscription;
this.buffer = new AtomicInteger();
requestItems();
}
项目:reactive-jax-rs
文件:CompletableSubscriber.java
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
项目:Reactive-Programming-With-Java-9
文件:NumberSubscriber.java
@Override
public void onSubscribe(Subscription subscription) {
(this.subscription = subscription).request(1);
}
项目:Reactive-Programming-With-Java-9
文件:WelcomeProcessor.java
@Override
public void onSubscribe(Subscription subscription) {
// Request an unbounded number of items
subscription.request(Long.MAX_VALUE);
// Long.MAX_VALUE is considered as unbounded
}
项目:Reactive-Programming-With-Java-9
文件:WelcomeSubscriber.java
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.printf(Thread.currentThread().getName()+" subscribed with max count %d\n", maxCount);
subscription.request(maxCount);
}
项目:Reactive-Programming-With-Java-9
文件:NumberSubscriber.java
@Override
public void onSubscribe(Subscription subscription) {
(this.subscription = subscription).request(1);
}
项目:Java-9-Concurrency-Cookbook-Second-Edition
文件:Consumer2.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf("%s: Consumer 2: Subscription received\n", Thread.currentThread().getName());
this.subscription=subscription;
subscription.request(1);
}
项目:Java-9-Concurrency-Cookbook-Second-Edition
文件:Consumer.java
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
System.out.printf("%s: Consumer - Subscription\n",Thread.currentThread().getName());
}
项目:java-feature-set
文件:FlowAPITest.java
@Override
public final void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}