@Override public Observable<String> getNewMessage() { return Observable.create(emitter -> { MainThreadDisposable.verifyMainThread(); ivSend.setOnClickListener(view -> { if (!etMessageContent.getText().toString().isEmpty()) { emitter.onNext(etMessageContent.getText().toString()); etMessageContent.setText(""); } }); emitter.setDisposable(new MainThreadDisposable() { @Override protected void onDispose() { ivSend.setOnClickListener(null); } }); }); }
@Override public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception { checkUiThread(); View.OnClickListener listener = v -> { if (!emitter.isCancelled()) { emitter.onNext(1); } }; view.setOnClickListener(listener); emitter.setDisposable(new MainThreadDisposable() { @Override protected void onDispose() { view.setOnClickListener(null); } }); }
private void demo3() { Observable.create(emitter -> { emitter.setDisposable(new MainThreadDisposable() { @Override protected void onDispose() { helloText.setOnClickListener(null); } }); helloText.setOnClickListener(v -> emitter.onNext(v)); }) .subscribe(); }
@Override protected void subscribeActual(final Observer<? super RxWebViewClientData> observer) { MainThreadDisposable.verifyMainThread(); webView.setWebViewClient(new ClientWrapper(observer)); observer.onSubscribe(new MainThreadDisposable() { @Override protected void onDispose() { webView.setWebViewClient(null); } }); }
@Override protected void subscribeActual(Observer<? super RxWebChromeClientData> observer) { webView.setWebChromeClient(new ClientWrapper(observer)); observer.onSubscribe(new MainThreadDisposable() { @Override protected void onDispose() { webView.setWebChromeClient(null); } }); }