/** * 获取快递信息 * * @param type 快递类型 * @param postid 快递单号 */ public void getExpressInfo(String type, String postid) { isShowLoading.set(true); dataManager.getExpressInfo(type, postid) .subscribeOn(Schedulers.io()) // 在子线程中进行Http访问 .observeOn(AndroidSchedulers.mainThread()) // UI线程处理返回接口 .compose(getProvider().<ExpressInfo>bindUntilEvent(ActivityEvent.DESTROY)) // onDestroy取消订阅 .subscribe(new DefaultObserver<ExpressInfo>() { // 订阅 @Override public void onNext(@NonNull ExpressInfo expressInfo) { ExpressViewModel.this.expressInfo.setExpressInfo(expressInfo); } @Override public void onError(@NonNull Throwable e) { errorMessage.set(e.getMessage()); isShowLoading.set(false); } @Override public void onComplete() { isShowLoading.set(false); } }); }
/** * 获取快递信息 * * @param type 快递类型 * @param postid 快递单号 */ public void getExpressInfo(String type, String postid) { expressView.showProgressDialog(); dataManager.getExpressInfo(type, postid) .subscribeOn(Schedulers.io()) // 在子线程中进行Http访问 .observeOn(AndroidSchedulers.mainThread()) // UI线程处理返回接口 .compose(getProvider().<ExpressInfo>bindUntilEvent(ActivityEvent.DESTROY)) // onDestroy取消订阅 .subscribe(new DefaultObserver<ExpressInfo>() { // 订阅 @Override public void onNext(@NonNull ExpressInfo expressInfo) { expressView.updateView(expressInfo); } @Override public void onError(@NonNull Throwable e) { expressView.showError(e.getMessage()); expressView.hideProgressDialog(); } @Override public void onComplete() { expressView.hideProgressDialog(); } }); }
@Test(timeout = 1000) public void testUnsubscriptionCase() { BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed for (int i = 0; i < 10; i++) { final Observer<Object> o = TestHelper.mockObserver(); InOrder inOrder = inOrder(o); String v = "" + i; src.accept(v); System.out.printf("Turn: %d%n", i); src.firstElement() .toObservable() .flatMap(new Function<String, Observable<String>>() { @Override public Observable<String> apply(String t1) { return Observable.just(t1 + ", " + t1); } }) .subscribe(new DefaultObserver<String>() { @Override public void onNext(String t) { o.onNext(t); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onComplete() { o.onComplete(); } }); inOrder.verify(o).onNext(v + ", " + v); inOrder.verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); } }
@Test(timeout = 1000) public void testUnsubscriptionCase() { ReplayRelay<String> src = ReplayRelay.create(); for (int i = 0; i < 10; i++) { final Observer<Object> o = TestHelper.mockObserver(); InOrder inOrder = inOrder(o); String v = "" + i; src.accept(v); System.out.printf("Turn: %d%n", i); src.firstElement() .toObservable() .flatMap(new Function<String, Observable<String>>() { @Override public Observable<String> apply(String t1) { return Observable.just(t1 + ", " + t1); } }) .subscribe(new DefaultObserver<String>() { @Override public void onNext(String t) { System.out.println(t); o.onNext(t); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onComplete() { o.onComplete(); } }); inOrder.verify(o).onNext("0, 0"); inOrder.verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); } }
@Test @Ignore("OOMs") public void testEmissionSubscriptionRace() throws Exception { Scheduler s = Schedulers.io(); Scheduler.Worker worker = Schedulers.io().createWorker(); try { for (int i = 0; i < 50000; i++) { if (i % 1000 == 0) { System.out.println(i); } final BehaviorRelay<Object> rs = BehaviorRelay.create(); final CountDownLatch finish = new CountDownLatch(1); final CountDownLatch start = new CountDownLatch(1); worker.schedule(new Runnable() { @Override public void run() { try { start.await(); } catch (Exception e1) { e1.printStackTrace(); } rs.accept(1); } }); final AtomicReference<Object> o = new AtomicReference<Object>(); rs.subscribeOn(s).observeOn(Schedulers.io()) .subscribe(new DefaultObserver<Object>() { @Override public void onComplete() { o.set(-1); finish.countDown(); } @Override public void onError(Throwable e) { o.set(e); finish.countDown(); } @Override public void onNext(Object t) { o.set(t); finish.countDown(); } }); start.countDown(); if (!finish.await(5, TimeUnit.SECONDS)) { System.out.println(o.get()); System.out.println(rs.hasObservers()); fail("Timeout @ " + i); break; } else { Assert.assertEquals(1, o.get()); } } } finally { worker.dispose(); } }
@Test public void testReplayRelayEmissionSubscriptionRace() throws Exception { Scheduler s = Schedulers.io(); Scheduler.Worker worker = Schedulers.io().createWorker(); try { for (int i = 0; i < 50000; i++) { if (i % 1000 == 0) { System.out.println(i); } final ReplayRelay<Object> rs = ReplayRelay.create(); final CountDownLatch finish = new CountDownLatch(1); final CountDownLatch start = new CountDownLatch(1); worker.schedule(new Runnable() { @Override public void run() { try { start.await(); } catch (Exception e1) { e1.printStackTrace(); } rs.accept(1); } }); final AtomicReference<Object> o = new AtomicReference<Object>(); rs.subscribeOn(s).observeOn(Schedulers.io()) .subscribe(new DefaultObserver<Object>() { @Override public void onComplete() { o.set(-1); finish.countDown(); } @Override public void onError(Throwable e) { o.set(e); finish.countDown(); } @Override public void onNext(Object t) { o.set(t); finish.countDown(); } }); start.countDown(); if (!finish.await(5, TimeUnit.SECONDS)) { System.out.println(o.get()); System.out.println(rs.hasObservers()); Assert.fail("Timeout @ " + i); break; } else { Assert.assertEquals(1, o.get()); } } } finally { worker.dispose(); } }
@Test(timeout = 1000) public void testUnsubscriptionCase() { PublishRelay<String> src = PublishRelay.create(); for (int i = 0; i < 10; i++) { final Observer<Object> o = TestHelper.mockObserver(); InOrder inOrder = inOrder(o); String v = "" + i; System.out.printf("Turn: %d%n", i); src.firstElement() .toObservable() .flatMap(new Function<String, Observable<String>>() { @Override public Observable<String> apply(String t1) { return Observable.just(t1 + ", " + t1); } }) .subscribe(new DefaultObserver<String>() { @Override public void onNext(String t) { o.onNext(t); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onComplete() { o.onComplete(); } }); src.accept(v); inOrder.verify(o).onNext(v + ", " + v); inOrder.verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); } }
@Test public void testReplaySubjectEmissionSubscriptionRace() throws Exception { Scheduler s = Schedulers.io(); Scheduler.Worker worker = Schedulers.io().createWorker(); try { for (int i = 0; i < 50000; i++) { if (i % 1000 == 0) { System.out.println(i); } final ReplayRelay<Object> rs = ReplayRelay.createWithSize(2); final CountDownLatch finish = new CountDownLatch(1); final CountDownLatch start = new CountDownLatch(1); // int j = i; worker.schedule(new Runnable() { @Override public void run() { try { start.await(); } catch (Exception e1) { e1.printStackTrace(); } // System.out.println("> " + j); rs.accept(1); } }); final AtomicReference<Object> o = new AtomicReference<Object>(); rs // .doOnSubscribe(v -> System.out.println("!! " + j)) // .doOnNext(e -> System.out.println(">> " + j)) .subscribeOn(s) .observeOn(Schedulers.io()) // .doOnNext(e -> System.out.println(">>> " + j)) .subscribe(new DefaultObserver<Object>() { @Override protected void onStart() { super.onStart(); } @Override public void onComplete() { o.set(-1); finish.countDown(); } @Override public void onError(Throwable e) { o.set(e); finish.countDown(); } @Override public void onNext(Object t) { o.set(t); finish.countDown(); } }); start.countDown(); if (!finish.await(5, TimeUnit.SECONDS)) { System.out.println(o.get()); System.out.println(rs.hasObservers()); Assert.fail("Timeout @ " + i); break; } else { Assert.assertEquals(1, o.get()); } } } finally { worker.dispose(); } }
/** * This test executes the real query to github server. * Test created by Robert Zagorski on 19.10.2016 */ @Test public void main() throws IOException, InterruptedException { // Create a very simple REST adapter which points the GitHub API. RxCallAdapter rxCallAdapter = new RxCallAdapter.Builder() .addBackoffStrategy(Exponential.init() .addThrowable(UnknownHostException.class) .addThrowable(SocketTimeoutException.class) .setMaxRetries(3).build()) .build(); Retrofit retrofit = new Retrofit.Builder() .baseUrl(API_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(new RxErrorHandingFactory(rxCallAdapter)) .build(); // Create an instance of our GitHub API interface. GitHub github = retrofit.create(GitHub.class); // Create a call instance for looking up Retrofit contributors. Observable<List<Repository>> call = github.repos("square"); final CountDownLatch latch = new CountDownLatch(1); // Fetch and print a list of the contributors to the retrofiterrorhandler. call.subscribe(new DefaultObserver<List<Repository>>() { @Override public void onComplete() { System.out.println(new GregorianCalendar().toInstant().toString() + " Finished"); latch.countDown(); } @Override public void onError(Throwable e) { System.out.println(new GregorianCalendar().toInstant().toString() + " Finished with error: " + e); onComplete(); } @Override public void onNext(List<Repository> repositories) { for (Repository repository : repositories) { System.out.println(repository.name + " (" + repository.description + ")"); } } }); latch.await(); }