Java 类io.reactivex.observers.DefaultObserver 实例源码
项目:Demos
文件:ExpressViewModel.java
/**
* 获取快递信息
*
* @param type 快递类型
* @param postid 快递单号
*/
public void getExpressInfo(String type, String postid) {
isShowLoading.set(true);
dataManager.getExpressInfo(type, postid)
.subscribeOn(Schedulers.io()) // 在子线程中进行Http访问
.observeOn(AndroidSchedulers.mainThread()) // UI线程处理返回接口
.compose(getProvider().<ExpressInfo>bindUntilEvent(ActivityEvent.DESTROY)) // onDestroy取消订阅
.subscribe(new DefaultObserver<ExpressInfo>() { // 订阅
@Override
public void onNext(@NonNull ExpressInfo expressInfo) {
ExpressViewModel.this.expressInfo.setExpressInfo(expressInfo);
}
@Override
public void onError(@NonNull Throwable e) {
errorMessage.set(e.getMessage());
isShowLoading.set(false);
}
@Override
public void onComplete() {
isShowLoading.set(false);
}
});
}
项目:Demos
文件:ExpressPresenter.java
/**
* 获取快递信息
*
* @param type 快递类型
* @param postid 快递单号
*/
public void getExpressInfo(String type, String postid) {
expressView.showProgressDialog();
dataManager.getExpressInfo(type, postid)
.subscribeOn(Schedulers.io()) // 在子线程中进行Http访问
.observeOn(AndroidSchedulers.mainThread()) // UI线程处理返回接口
.compose(getProvider().<ExpressInfo>bindUntilEvent(ActivityEvent.DESTROY)) // onDestroy取消订阅
.subscribe(new DefaultObserver<ExpressInfo>() { // 订阅
@Override
public void onNext(@NonNull ExpressInfo expressInfo) {
expressView.updateView(expressInfo);
}
@Override
public void onError(@NonNull Throwable e) {
expressView.showError(e.getMessage());
expressView.hideProgressDialog();
}
@Override
public void onComplete() {
expressView.hideProgressDialog();
}
});
}
项目:RxRelay
文件:BehaviorRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
src.accept(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
项目:RxRelay
文件:BehaviorRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
src.accept(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
项目:RxRelay
文件:ReplayRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
ReplayRelay<String> src = ReplayRelay.create();
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
src.accept(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
System.out.println(t);
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
inOrder.verify(o).onNext("0, 0");
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
项目:RxRelay
文件:BehaviorRelayTest.java
@Test
@Ignore("OOMs")
public void testEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final BehaviorRelay<Object> rs = BehaviorRelay.create();
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
worker.schedule(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.accept(1);
}
});
final AtomicReference<Object> o = new AtomicReference<Object>();
rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new DefaultObserver<Object>() {
@Override
public void onComplete() {
o.set(-1);
finish.countDown();
}
@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}
@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}
});
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
}
}
} finally {
worker.dispose();
}
}
项目:RxRelay
文件:ReplayRelayConcurrencyTest.java
@Test
public void testReplayRelayEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final ReplayRelay<Object> rs = ReplayRelay.create();
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
worker.schedule(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.accept(1);
}
});
final AtomicReference<Object> o = new AtomicReference<Object>();
rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new DefaultObserver<Object>() {
@Override
public void onComplete() {
o.set(-1);
finish.countDown();
}
@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}
@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}
});
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
}
}
} finally {
worker.dispose();
}
}
项目:RxRelay
文件:PublishRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
PublishRelay<String> src = PublishRelay.create();
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
src.accept(v);
inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
项目:RxRelay
文件:ReplayRelayBoundedConcurrencyTest.java
@Test
public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final ReplayRelay<Object> rs = ReplayRelay.createWithSize(2);
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
// int j = i;
worker.schedule(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
// System.out.println("> " + j);
rs.accept(1);
}
});
final AtomicReference<Object> o = new AtomicReference<Object>();
rs
// .doOnSubscribe(v -> System.out.println("!! " + j))
// .doOnNext(e -> System.out.println(">> " + j))
.subscribeOn(s)
.observeOn(Schedulers.io())
// .doOnNext(e -> System.out.println(">>> " + j))
.subscribe(new DefaultObserver<Object>() {
@Override
protected void onStart() {
super.onStart();
}
@Override
public void onComplete() {
o.set(-1);
finish.countDown();
}
@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}
@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}
});
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
}
}
} finally {
worker.dispose();
}
}
项目:RetrofitRxErrorHandler
文件:RealExampleTest.java
/**
* This test executes the real query to github server.
* Test created by Robert Zagorski on 19.10.2016
*/
@Test
public void main() throws IOException, InterruptedException {
// Create a very simple REST adapter which points the GitHub API.
RxCallAdapter rxCallAdapter = new RxCallAdapter.Builder()
.addBackoffStrategy(Exponential.init()
.addThrowable(UnknownHostException.class)
.addThrowable(SocketTimeoutException.class)
.setMaxRetries(3).build())
.build();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(API_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(new RxErrorHandingFactory(rxCallAdapter))
.build();
// Create an instance of our GitHub API interface.
GitHub github = retrofit.create(GitHub.class);
// Create a call instance for looking up Retrofit contributors.
Observable<List<Repository>> call = github.repos("square");
final CountDownLatch latch = new CountDownLatch(1);
// Fetch and print a list of the contributors to the retrofiterrorhandler.
call.subscribe(new DefaultObserver<List<Repository>>() {
@Override
public void onComplete() {
System.out.println(new GregorianCalendar().toInstant().toString() + " Finished");
latch.countDown();
}
@Override
public void onError(Throwable e) {
System.out.println(new GregorianCalendar().toInstant().toString() + " Finished with error: " + e);
onComplete();
}
@Override
public void onNext(List<Repository> repositories) {
for (Repository repository : repositories) {
System.out.println(repository.name + " (" + repository.description + ")");
}
}
});
latch.await();
}
项目:RxRelay
文件:ReplayRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
ReplayRelay<String> src = ReplayRelay.create();
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
src.accept(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
System.out.println(t);
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
inOrder.verify(o).onNext("0, 0");
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
项目:RxRelay
文件:BehaviorRelayTest.java
@Test
@Ignore("OOMs")
public void testEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final BehaviorRelay<Object> rs = BehaviorRelay.create();
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
worker.schedule(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.accept(1);
}
});
final AtomicReference<Object> o = new AtomicReference<Object>();
rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new DefaultObserver<Object>() {
@Override
public void onComplete() {
o.set(-1);
finish.countDown();
}
@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}
@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}
});
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
}
}
} finally {
worker.dispose();
}
}
项目:RxRelay
文件:ReplayRelayConcurrencyTest.java
@Test
public void testReplayRelayEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final ReplayRelay<Object> rs = ReplayRelay.create();
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
worker.schedule(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.accept(1);
}
});
final AtomicReference<Object> o = new AtomicReference<Object>();
rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new DefaultObserver<Object>() {
@Override
public void onComplete() {
o.set(-1);
finish.countDown();
}
@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}
@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}
});
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
}
}
} finally {
worker.dispose();
}
}
项目:RxRelay
文件:PublishRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
PublishRelay<String> src = PublishRelay.create();
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
src.accept(v);
inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
项目:RxRelay
文件:ReplayRelayBoundedConcurrencyTest.java
@Test
public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final ReplayRelay<Object> rs = ReplayRelay.createWithSize(2);
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
// int j = i;
worker.schedule(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
// System.out.println("> " + j);
rs.accept(1);
}
});
final AtomicReference<Object> o = new AtomicReference<Object>();
rs
// .doOnSubscribe(v -> System.out.println("!! " + j))
// .doOnNext(e -> System.out.println(">> " + j))
.subscribeOn(s)
.observeOn(Schedulers.io())
// .doOnNext(e -> System.out.println(">>> " + j))
.subscribe(new DefaultObserver<Object>() {
@Override
protected void onStart() {
super.onStart();
}
@Override
public void onComplete() {
o.set(-1);
finish.countDown();
}
@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}
@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}
});
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
}
}
} finally {
worker.dispose();
}
}