Java 类java.util.concurrent.Flow.Publisher 实例源码
项目:reactive-jax-rs
文件:CustomerRepository.java
public Publisher<Customer> findAllAsync() throws IOException {
String customers = IOUtils.toString(new FileReader("customers.json")).trim();
final String customerData = customers.substring(1, customers.length() - 1);
return new Publisher<Customer>() {
@Override
public void subscribe(Subscriber<? super Customer> subscriber) {
asList(customerData.split("\\{"))
.stream()
.filter(s -> !s.isEmpty())
.map(c -> c.substring(0, c.lastIndexOf('}')))
.map(c -> c.split(","))
.map(c -> {
String firstName = c[0].substring(c[0].indexOf(':') + 3, c[0].length() - 1);
String lastName = c[1].substring(c[1].indexOf(':') + 3, c[1].length() - 1);
return new Customer(firstName, lastName);
})
.forEach(subscriber::onNext);
subscriber.onComplete();
}
};
}
项目:reactive-jax-rs
文件:CustomerRepository.java
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException {
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
AtomicLong offset = new AtomicLong(0);
AtomicInteger resultCount = new AtomicInteger(0);
SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>();
Semaphore writeSemaphore = new Semaphore(1);
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher,
andThen((count, s) -> {
writeSemaphore.release();
customers.subscribe(pullEach((Customer customer, Subscription subscription) -> {
String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",",
customer.getFirstName(), customer.getLastName());
offset.addAndGet(count);
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher,
andThen((size, c) -> {
writeSemaphore.release();
offset.addAndGet(size);
resultCount.incrementAndGet();
subscription.request(1);
}));
}).andThen(() -> {
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher,
andThen((d, e) -> {
writeSemaphore.release();
try {
fileChannel.close();
resultPublisher.publish(resultCount.intValue());
} catch (IOException error) {
resultPublisher.publish(error);
}
}));
}).exceptionally(error -> resultPublisher.publish(error)));
}));
return resultPublisher;
}
项目:demo-java-9
文件:LoggingRandomDelaySubscriber.java
public static void createAndSubscribe(String name, Publisher<?> publisher) {
publisher.subscribe(new LoggingRandomDelaySubscriber(name));
}
项目:Reactive-Programming-With-Java-9
文件:Main_NumberPublisher.java
public static void main(String[] args) {
long start_range=10, stop_range=22;
Publisher<Long> publisher = new NumberPublisher(start_range,stop_range);
// Register Subscriber
int count=10;
NumberSubscriber subscriber = new NumberSubscriber(count);
publisher.subscribe(subscriber);
}
项目:Reactive-Programming-With-Java-9
文件:Main_NumberPublisher.java
public static void main(String[] args) {
long start_range=10, stop_range=22;
Publisher<Long> publisher = new NumberPublisher(start_range,stop_range);
// Register Subscriber
int count=5;
NumberSubscriber subscriber = new NumberSubscriber(count);
publisher.subscribe(subscriber);
}