@Override public void subscribe(Subscriber<? super T> s) { subscriber = s; subscriber.onSubscribe(new Subscription() { @Override public void request(long count) { requested = true; if (item != null) { publish(item); } } @Override public void cancel() { subscriber = null; } }); if (error != null) { publish(error); } }
public Publisher<Customer> findAllAsync() throws IOException { String customers = IOUtils.toString(new FileReader("customers.json")).trim(); final String customerData = customers.substring(1, customers.length() - 1); return new Publisher<Customer>() { @Override public void subscribe(Subscriber<? super Customer> subscriber) { asList(customerData.split("\\{")) .stream() .filter(s -> !s.isEmpty()) .map(c -> c.substring(0, c.lastIndexOf('}'))) .map(c -> c.split(",")) .map(c -> { String firstName = c[0].substring(c[0].indexOf(':') + 3, c[0].length() - 1); String lastName = c[1].substring(c[1].indexOf(':') + 3, c[1].length() - 1); return new Customer(firstName, lastName); }) .forEach(subscriber::onNext); subscriber.onComplete(); } }; }
public void publish(T i) { if (requested && subscriber != null) { Subscriber<? super T> s = subscriber; subscriber = null; s.onNext(i); s.onComplete(); } else { item = i; } }
public void publish(Throwable error) { if (subscriber != null) { Subscriber<? super T> s = subscriber; subscriber = null; s.onError(error); } }
@Override public void subscribe(Subscriber<? super byte[]> subscriber) { byte[] buffer = new byte[1024]; subscriber.onSubscribe(new Subscription() { @Override public void request(long count) { try { for (long i = 0; i < count; i++) { int read = stream.read(buffer); byte[] item = buffer; if (read != buffer.length) { item = new byte[read]; System.arraycopy(buffer, 0, item, 0, read); } subscriber.onNext(item); } } catch (IOException e) { subscriber.onError(e); } } @Override public void cancel() { } }); }
public NumberSubscription(ExecutorService executor,Subscriber<? super Long> subscriber,long start_range,long stop_range) { // TODO Auto-generated constructor stub this.executor = executor; this.subscriber=subscriber; this.start_range=start_range; this.stop_range=stop_range; }
public static void main(String[] args) { MyPublisher publisher=new MyPublisher(); Subscriber<News> consumer1, consumer2; consumer1=new Consumer("Consumer 1"); consumer2=new Consumer("Consumer 2"); publisher.subscribe(consumer1); publisher.subscribe(consumer2); System.out.printf("Main: Start\n"); News news=new News(); news.setTitle("My first news"); news.setContent("This is the content"); news.setDate(new Date()); publisher.publish(news); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } news=new News(); news.setTitle("My second news"); news.setContent("This is the content of the second news"); news.setDate(new Date()); publisher.publish(news); System.out.printf("Main: End\n"); }
private Sub createNewSubscriptionFor(Subscriber<? super Integer> subscriber) { int startValue = subscriptions.stream() .mapToInt(sub -> sub.nextValue.get()) .min() .orElse(0); return new Sub(subscriber, startValue); }
@Override public void subscribe(Subscriber<? super Long> subscriber) { // TODO Auto-generated method stub subscriber.onSubscribe(new NumberSubscription(executor,subscriber,start_range,stop_range)); }
@Override public void subscribe(Subscriber<? super T> s) { s.onSubscribe(new JustSubscription<T>(s, value)); }
public JustSubscription(Subscriber<? super T> actual, T value) { this.actual = actual; this.value = value; }
@Override public void subscribe(Subscriber<? super Integer> subscriber) { Sub subscription = createNewSubscriptionFor(subscriber); registerSubscription(subscription); subscriber.onSubscribe(subscription); }
public Sub(Subscriber<? super Integer> subscriber, int startValue) { this.subscriber = subscriber; this.nextValue = new AtomicInteger(startValue); this.canceled = new AtomicBoolean(false); }
@Override public void subscribe(Subscriber<? super News> subscriber) { ConsumerData consumerData=new ConsumerData(); consumerData.setConsumer((Consumer)subscriber); MySubscription subscription=new MySubscription(); consumerData.setSubscription(subscription); subscriber.onSubscribe(subscription); consumers.add(consumerData); }