public static void main(String[] args) { // TODO Auto-generated method stub ResourceSubscriber<Long> resourceSubscriber = new ResourceSubscriber<Long>() { @Override public void onComplete() { // TODO Auto-generated method stub System.out.println("Its Done!!!"); dispose(); } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub throwable.printStackTrace(); dispose(); } @Override public void onNext(Long value_long) { // TODO Auto-generated method stub if(value_long==7) dispose(); System.out.println("value :-"+value_long); } @Override protected void onStart() { // TODO Auto-generated method stub request(Long.MAX_VALUE); } }; Flowable.rangeLong(5, 4).subscribe(resourceSubscriber); resourceSubscriber.dispose(); }
public static void main(String[] args) { Observable<Beer> beerData = BeerServer.getData(); // No streaming just yet ResourceSubscriber<Beer> beerSubscriber = new ResourceSubscriber<Beer>() { @Override public void onNext(Beer beer) { System.out.println("Got "+ beer); } @Override public void onError(Throwable throwable) { System.err.println("In Observer.onError(): " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("*** The stream is over ***"); } }; // Converting an Observable to Flowable beerData .toFlowable(BackpressureStrategy.BUFFER) .subscribe(beerSubscriber); // Streaming starts here // If the subscriber is less than 21 year old, cancel subscription beerSubscriber.dispose(); }
private void doSomeWork() { Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { for (int i =0;i<10000;i++){ e.onNext(i); } e.onComplete(); } }, BackpressureStrategy.DROP); flowable.subscribe(new ResourceSubscriber<Integer>() { @Override public void onNext(Integer integer) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } }); }
public void subscribe(Flowable<M> flowable, final boolean contentPresent) { compositeDisposable.add(applyScheduler(flowable).subscribeWith(new ResourceSubscriber<M>() { @Override public void onNext(M m) { CBLceRx2Presenter.this.onNext(m); } @Override public void onError(Throwable t) { CBLceRx2Presenter.this.onError(t, contentPresent); } @Override public void onComplete() { CBLceRx2Presenter.this.onComplete(); } })); }