@Override protected void starting(Description description) { if (restoreHandlers) { // https://github.com/ReactiveX/RxAndroid/pull/358 // originalInitMainThreadInitHandler = // RxAndroidPlugins.getInitMainThreadScheduler(); // originalMainThreadHandler = RxAndroidPlugins.getMainThreadScheduler(); } RxAndroidPlugins.reset(); RxAndroidPlugins.setInitMainThreadSchedulerHandler( new Function<Callable<Scheduler>, Scheduler>() { @Override public Scheduler apply(Callable<Scheduler> schedulerCallable) throws Exception { return delegatingMainThreadScheduler; } }); RxAndroidPlugins.setMainThreadSchedulerHandler( new Function<Scheduler, Scheduler>() { @Override public Scheduler apply(Scheduler scheduler) throws Exception { return delegatingMainThreadScheduler; } }); }
@Test public void defaultMainThreadSchedulerIsInitializedLazily() { Function<Callable<Scheduler>, Scheduler> safeOverride = new Function<Callable<Scheduler>, Scheduler>() { @Override public Scheduler apply(Callable<Scheduler> scheduler) { return new EmptyScheduler(); } }; Callable<Scheduler> unsafeDefault = new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { throw new AssertionError(); } }; RxSwtPlugins.setInitMainThreadSchedulerHandler(safeOverride); RxSwtPlugins.initMainThreadScheduler(unsafeDefault); }
private static void demo2() throws Exception { final ExecutorService executor = Executors.newFixedThreadPool(1000); final Scheduler pooledScheduler = Schedulers.from(executor); Observable.range(1, 10000) .flatMap(i -> Observable.just(i) .subscribeOn(pooledScheduler) .map(Sandbox::importantLongTask) ) .doOnTerminate(WAIT_LATCH::countDown) .map(Objects::toString) .subscribe(e -> log("subscribe", e)); WAIT_LATCH.await(); executor.shutdown(); }
/** * Initializes a scheduler that will be used for offloading IO work performed by {@link Extract}s * <p/> * <p/> Threads executed on this scheduler have {@link Thread#NORM_PRIORITY} * @param ioPoolSize Size of thread pool for this scheduler */ private Scheduler initExtractScheduler(Integer ioPoolSize) { ThreadFactory factory = newThreadFactory("TaskManager-Extract-%s", Thread.NORM_PRIORITY); ExecutorService executor = Executors.newFixedThreadPool(ioPoolSize, factory); shutdownHook.registerExecutor(executor); return Schedulers.from(executor); }
@Test(expected = NullPointerException.class) public void testapplyRequireNonNullThrowsNPE() { RxSwtPlugins.applyRequireNonNull(new Function<Callable<Scheduler>, Scheduler>() { @Override public Scheduler apply(Callable<Scheduler> t) throws Exception { return null; } }, null); }
@BeforeClass public static void init() throws Exception { // Tell RxAndroid to not use android main ui thread scheduler RxAndroidPlugins.setInitMainThreadSchedulerHandler( new Function<Callable<Scheduler>, Scheduler>() { @Override public Scheduler apply(@NonNull Callable<Scheduler> schedulerCallable) throws Exception { return Schedulers.trampoline(); } }); }
/** * Initializes a scheduler that will be used for offloading IO work performed by {@link Load}s * <p/> * <p/> Threads executed on this scheduler have {@link Thread#NORM_PRIORITY}+2 * @param ioPoolSize Size of thread pool for this scheduler */ private Scheduler initLoadScheduler(Integer ioPoolSize) { ThreadFactory factory = newThreadFactory("TaskManager-Load-%s", Thread.NORM_PRIORITY + 2); ExecutorService executor = Executors.newFixedThreadPool(ioPoolSize, factory); shutdownHook.registerExecutor(executor); return Schedulers.from(executor); }
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) { Scheduler scheduler = apply(f,s); if (scheduler == null) { throw new NullPointerException("Scheduler Callable returned null"); } return scheduler; }
/** * Overrides the default implementation, to simplify implementing async transforms * <p/> * <p/> {@link PollingTransform}s are observed on the specified scheduler */ @Override public Flowable<List<List<Transmutation>>> applyAsync(List<List<Transmutation>> input, Scheduler scheduler) { // determine if the implementing class should run if (skipTransform(input)) { return Flowable.just(input); } // remove items that should not be processed List<List<Transmutation>> toProcess = filter(input); if (isNull(toProcess)) { logger.info("Stopping processing in {} due to threshold not met...", this.getClass().getSimpleName()); return Flowable.empty(); } return Flowable.just(toProcess) .observeOn(scheduler) // send payload .map(this::sendRequest) .flatMap(req -> tryRetrieve(req, -1)) // or timeout after the specified number of ms .timeout(timeoutMillis(), TimeUnit.MILLISECONDS) .doOnError(this::logAsyncError) .onErrorResumeNext(Flowable.empty()); }
@Test public void getInitMainThreadSchedulerHandlerReturnsHandler() { Function<Callable<Scheduler>, Scheduler> handler = new Function<Callable<Scheduler>, Scheduler>() { @Override public Scheduler apply(Callable<Scheduler> schedulerCallable) throws Exception { return Schedulers.trampoline(); } }; RxAndroidPlugins.setInitMainThreadSchedulerHandler(handler); assertSame(handler, RxAndroidPlugins.getInitMainThreadSchedulerHandler()); }
RxJava2CallAdapter(Type responseType, Scheduler scheduler, boolean isAsync, boolean isResult, boolean isBody, boolean isFlowable, boolean isSingle, boolean isMaybe, boolean isCompletable) { this.responseType = responseType; this.scheduler = scheduler; this.isAsync = isAsync; this.isResult = isResult; this.isBody = isBody; this.isFlowable = isFlowable; this.isSingle = isSingle; this.isMaybe = isMaybe; this.isCompletable = isCompletable; }
private MessageGroupSubscriber(long messageTimeoutNanos, int bufferedMaxMessages, Sender<M, R> sender, Scheduler scheduler) { this.messageTimeoutNanos = messageTimeoutNanos == 0 ? Long.MAX_VALUE : messageTimeoutNanos; this.bufferedMaxMessages = bufferedMaxMessages; this.sender = sender; this.scheduler = scheduler; }
@Test public void runningWorkReportsBusy() { Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(new Runnable() { @Override public void run() { assertBusy(); } }); delegate.triggerActions(); }
@Inject @Replaceable public TodayWeatherViewModel(NavigationController navigation, @RxObservable(PAGE) Observable<Integer> pageChangeObservable, PermissionService permissionService, LocationService locationService, WeatherService weatherService, TodayWeatherResponseFilter weatherParser, @RxScheduler(MAIN) Scheduler androidScheduler) { super(navigation, pageChangeObservable, permissionService, locationService, weatherService, weatherParser, androidScheduler); }
public RxToFXListBinding(Scheduler scheduler, ObservableList<E> rxList, javafx.collections.ObservableList<E> fxList) { this.rxList = rxList; this.fxList = fxList; fxList.clear(); disposer.register(rxList.observable().observeOn(scheduler).subscribe(fxList::add)); disposer.register(rxList.onAdded().observeOn(scheduler).subscribe(c -> fxList.add(c.getIndex(), c.getValue()))); disposer.register(rxList.onRemoved().observeOn(scheduler).subscribe(c -> fxList.remove(c.getIndex()))); disposer.register(rxList.onUpdatedChanged().observeOn(scheduler).subscribe(c -> fxList.set(c.getNewValue().getIndex(), c.getOldValue().getValue()))); disposer.register(rxList.onMoved().observeOn(scheduler).subscribe(c -> fxList.add(c.getNewValue().getIndex(), fxList.remove(c.getOldValue().getIndex())))); }
@Test public void getMainThreadSchedulerHandlerReturnsHandler() { Function<Scheduler, Scheduler> handler = new Function<Scheduler, Scheduler>() { @Override public Scheduler apply(Scheduler scheduler) { return Schedulers.trampoline(); } }; RxSwtPlugins.setMainThreadSchedulerHandler(handler); assertSame(handler, RxSwtPlugins.getOnMainThreadSchedulerHandler()); }
@BeforeClass public static void onlyOnce() throws Exception { RxAndroidPlugins.setInitMainThreadSchedulerHandler( new Function<Callable<Scheduler>, Scheduler>() { @Override public Scheduler apply(Callable<Scheduler> schedulerCallable) throws Exception { return Schedulers.trampoline(); } }); }
public static Scheduler fixedSize(int size) { return autoClose(Executors.newFixedThreadPool(size, new ThreadFactory() { int i = 0; @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("FixedSizeScheduler(size=" + size + ")-" + (++i)); t.setPriority(Thread.NORM_PRIORITY); t.setDaemon(true); return t; } })); }
@Override public Scheduler ui() { return AndroidSchedulers.mainThread(); }
@Override public Scheduler computation() { return Schedulers.computation(); }
public Scheduler getIoScheduler() { return ioScheduler; }
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) { return toObservable(eventType).observeOn(scheduler).subscribe(onNext); }
@Provides @Singleton @Named(Rx.IO) Scheduler provideIoScheduler() { return Schedulers.io(); }
@Override public Scheduler io() { return mTestScheduler; }
@Override @NonNull public Scheduler io() { return Schedulers.io(); }
@NonNull Scheduler io();
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer<Throwable> onError) { return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError); }
@NonNull @Override public Scheduler io() { return Schedulers.trampoline(); }
public TimezoneUpdateUseCase(Scheduler workload, Scheduler delivery, RestService restService, IAuthService authService) { super(workload, delivery); this.restService = restService; this.authService = authService; }
@Inject public OnBoardingScreenInteractor(@NonNull Scheduler observerScheduler, @NonNull Scheduler subscribeScheduler) { super(observerScheduler, subscribeScheduler); }
/** * A {@link Scheduler} which executes actions on a {@code Display}. */ public static Scheduler from(Display display) { return RxSwtPlugins.onMainThreadScheduler(new DisplayScheduler(display)); }
@Provides @Named("executor_schedule") Scheduler provideExecutorThread() { return Schedulers.io(); }
/** A {@link Scheduler} which executes actions on the Android main thread. */ public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); }
/** A {@link Scheduler} which executes actions on {@code looper}. */ public static Scheduler from(Looper looper) { if (looper == null) throw new NullPointerException("looper == null"); return new HandlerScheduler(new Handler(looper)); }
public static void setInitMainThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) { onInitMainThreadHandler = handler; }
@NonNull @Override public Scheduler io() { return Schedulers.io(); }
public GetUserUseCase(Scheduler workload, Scheduler delivery, RestService restService, IAuthService authService) { super(workload, delivery); this.restService = restService; this.authService = authService; }
@NonNull @Override public Scheduler computation() { return Schedulers.computation(); }