Java 类io.reactivex.Scheduler 实例源码

项目:RIBs    文件:AndroidSchedulersRule.java   
@Override
protected void starting(Description description) {
  if (restoreHandlers) {
    // https://github.com/ReactiveX/RxAndroid/pull/358
    //            originalInitMainThreadInitHandler =
    // RxAndroidPlugins.getInitMainThreadScheduler();
    //            originalMainThreadHandler = RxAndroidPlugins.getMainThreadScheduler();
  }
  RxAndroidPlugins.reset();
  RxAndroidPlugins.setInitMainThreadSchedulerHandler(
      new Function<Callable<Scheduler>, Scheduler>() {
        @Override
        public Scheduler apply(Callable<Scheduler> schedulerCallable) throws Exception {
          return delegatingMainThreadScheduler;
        }
      });
  RxAndroidPlugins.setMainThreadSchedulerHandler(
      new Function<Scheduler, Scheduler>() {
        @Override
        public Scheduler apply(Scheduler scheduler) throws Exception {
          return delegatingMainThreadScheduler;
        }
      });
}
项目:RxSWT    文件:RxSwtPluginsTest.java   
@Test
public void defaultMainThreadSchedulerIsInitializedLazily() {
    Function<Callable<Scheduler>, Scheduler> safeOverride = new Function<Callable<Scheduler>, Scheduler>() {
        @Override
        public Scheduler apply(Callable<Scheduler> scheduler) {
            return new EmptyScheduler();
        }
    };
    Callable<Scheduler> unsafeDefault = new Callable<Scheduler>() {
        @Override
        public Scheduler call() throws Exception {
            throw new AssertionError();
        }
    };

    RxSwtPlugins.setInitMainThreadSchedulerHandler(safeOverride);
    RxSwtPlugins.initMainThreadScheduler(unsafeDefault);
}
项目:Reactive-Android-Programming    文件:Sandbox.java   
private static void demo2() throws Exception {
    final ExecutorService executor = Executors.newFixedThreadPool(1000);
    final Scheduler pooledScheduler = Schedulers.from(executor);

    Observable.range(1, 10000)
            .flatMap(i -> Observable.just(i)
                    .subscribeOn(pooledScheduler)
                    .map(Sandbox::importantLongTask)
            )
            .doOnTerminate(WAIT_LATCH::countDown)
            .map(Objects::toString)
            .subscribe(e -> log("subscribe", e));

    WAIT_LATCH.await();
    executor.shutdown();
}
项目:pyplyn    文件:TaskManager.java   
/**
 * Initializes a scheduler that will be used for offloading IO work performed by {@link Extract}s
 * <p/>
 * <p/> Threads executed on this scheduler have {@link Thread#NORM_PRIORITY}
 * @param ioPoolSize Size of thread pool for this scheduler
 */
private Scheduler initExtractScheduler(Integer ioPoolSize) {
    ThreadFactory factory = newThreadFactory("TaskManager-Extract-%s", Thread.NORM_PRIORITY);
    ExecutorService executor = Executors.newFixedThreadPool(ioPoolSize, factory);
    shutdownHook.registerExecutor(executor);
    return Schedulers.from(executor);
}
项目:RxSWT    文件:RxSwtPluginsTest.java   
@Test(expected = NullPointerException.class)
public void testapplyRequireNonNullThrowsNPE() {
    RxSwtPlugins.applyRequireNonNull(new Function<Callable<Scheduler>, Scheduler>() {

        @Override
        public Scheduler apply(Callable<Scheduler> t) throws Exception {
            return null;
        }
    }, null);
}
项目:GitHub    文件:HomePresenterTest.java   
@BeforeClass public static void init() throws Exception {
  // Tell RxAndroid to not use android main ui thread scheduler
  RxAndroidPlugins.setInitMainThreadSchedulerHandler(
      new Function<Callable<Scheduler>, Scheduler>() {
        @Override public Scheduler apply(@NonNull Callable<Scheduler> schedulerCallable)
            throws Exception {
          return Schedulers.trampoline();
        }
      });
}
项目:pyplyn    文件:TaskManager.java   
/**
 * Initializes a scheduler that will be used for offloading IO work performed by {@link Load}s
 * <p/>
 * <p/> Threads executed on this scheduler have {@link Thread#NORM_PRIORITY}+2
 * @param ioPoolSize Size of thread pool for this scheduler
 */
private Scheduler initLoadScheduler(Integer ioPoolSize) {
    ThreadFactory factory = newThreadFactory("TaskManager-Load-%s", Thread.NORM_PRIORITY + 2);
    ExecutorService executor = Executors.newFixedThreadPool(ioPoolSize, factory);
    shutdownHook.registerExecutor(executor);
    return Schedulers.from(executor);
}
项目:RxSWT    文件:RxSwtPlugins.java   
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
    Scheduler scheduler = apply(f,s);
    if (scheduler == null) {
        throw new NullPointerException("Scheduler Callable returned null");
    }
    return scheduler;
}
项目:pyplyn    文件:PollingTransform.java   
/**
 * Overrides the default implementation, to simplify implementing async transforms
 * <p/>
 * <p/> {@link PollingTransform}s are observed on the specified scheduler
 */
@Override
public Flowable<List<List<Transmutation>>> applyAsync(List<List<Transmutation>> input, Scheduler scheduler) {
    // determine if the implementing class should run
    if (skipTransform(input)) {
        return Flowable.just(input);
    }

    // remove items that should not be processed
    List<List<Transmutation>> toProcess = filter(input);
    if (isNull(toProcess)) {
        logger.info("Stopping processing in {} due to threshold not met...", this.getClass().getSimpleName());
        return Flowable.empty();
    }

    return Flowable.just(toProcess)
            .observeOn(scheduler)

            // send payload
            .map(this::sendRequest)

            .flatMap(req -> tryRetrieve(req, -1))

            // or timeout after the specified number of ms
            .timeout(timeoutMillis(), TimeUnit.MILLISECONDS)
            .doOnError(this::logAsyncError)
            .onErrorResumeNext(Flowable.empty());
}
项目:GitHub    文件:RxAndroidPluginsTest.java   
@Test
public void getInitMainThreadSchedulerHandlerReturnsHandler() {
    Function<Callable<Scheduler>, Scheduler> handler = new Function<Callable<Scheduler>, Scheduler>() {
        @Override public Scheduler apply(Callable<Scheduler> schedulerCallable) throws Exception {
            return Schedulers.trampoline();
        }
    };
    RxAndroidPlugins.setInitMainThreadSchedulerHandler(handler);
    assertSame(handler, RxAndroidPlugins.getInitMainThreadSchedulerHandler());
}
项目:GitHub    文件:RxJava2CallAdapter.java   
RxJava2CallAdapter(Type responseType, Scheduler scheduler, boolean isAsync, boolean isResult,
    boolean isBody, boolean isFlowable, boolean isSingle, boolean isMaybe,
    boolean isCompletable) {
  this.responseType = responseType;
  this.scheduler = scheduler;
  this.isAsync = isAsync;
  this.isResult = isResult;
  this.isBody = isBody;
  this.isFlowable = isFlowable;
  this.isSingle = isSingle;
  this.isMaybe = isMaybe;
  this.isCompletable = isCompletable;
}
项目:buffer-slayer    文件:RxReporter.java   
private MessageGroupSubscriber(long messageTimeoutNanos,
                               int bufferedMaxMessages,
                               Sender<M, R> sender,
                               Scheduler scheduler) {
  this.messageTimeoutNanos = messageTimeoutNanos == 0 ? Long.MAX_VALUE : messageTimeoutNanos;
  this.bufferedMaxMessages = bufferedMaxMessages;
  this.sender = sender;
  this.scheduler = scheduler;
}
项目:RxIdler    文件:DelegatingIdlingResourceSchedulerTest.java   
@Test public void runningWorkReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new Runnable() {
    @Override public void run() {
      assertBusy();
    }
  });
  delegate.triggerActions();
}
项目:dagger-test-example    文件:TodayWeatherViewModel.java   
@Inject @Replaceable
public TodayWeatherViewModel(NavigationController navigation,
                             @RxObservable(PAGE) Observable<Integer> pageChangeObservable,
                             PermissionService permissionService,
                             LocationService locationService, WeatherService weatherService,
                             TodayWeatherResponseFilter weatherParser,
                             @RxScheduler(MAIN) Scheduler androidScheduler) {
    super(navigation, pageChangeObservable, permissionService, locationService, weatherService, weatherParser, androidScheduler);
}
项目:RxJavaCollections    文件:RxToFXListBinding.java   
public RxToFXListBinding(Scheduler scheduler, ObservableList<E> rxList, javafx.collections.ObservableList<E> fxList) {
    this.rxList = rxList;
    this.fxList = fxList;
    fxList.clear();
    disposer.register(rxList.observable().observeOn(scheduler).subscribe(fxList::add));
    disposer.register(rxList.onAdded().observeOn(scheduler).subscribe(c -> fxList.add(c.getIndex(), c.getValue())));
    disposer.register(rxList.onRemoved().observeOn(scheduler).subscribe(c -> fxList.remove(c.getIndex())));
    disposer.register(rxList.onUpdatedChanged().observeOn(scheduler).subscribe(c ->
            fxList.set(c.getNewValue().getIndex(), c.getOldValue().getValue())));
    disposer.register(rxList.onMoved().observeOn(scheduler).subscribe(c ->
            fxList.add(c.getNewValue().getIndex(), fxList.remove(c.getOldValue().getIndex()))));
}
项目:RxSWT    文件:RxSwtPluginsTest.java   
@Test
public void getMainThreadSchedulerHandlerReturnsHandler() {
    Function<Scheduler, Scheduler> handler = new Function<Scheduler, Scheduler>() {
        @Override
        public Scheduler apply(Scheduler scheduler) {
            return Schedulers.trampoline();
        }
    };
    RxSwtPlugins.setMainThreadSchedulerHandler(handler);
    assertSame(handler, RxSwtPlugins.getOnMainThreadSchedulerHandler());
}
项目:InstantAppStarter    文件:NewsSourcePresenterTest.java   
@BeforeClass
public static void onlyOnce() throws Exception {
    RxAndroidPlugins.setInitMainThreadSchedulerHandler(
            new Function<Callable<Scheduler>, Scheduler>() {
                @Override
                public Scheduler apply(Callable<Scheduler> schedulerCallable)
                        throws Exception {
                    return Schedulers.trampoline();
                }
            });
}
项目:Java-EX    文件:RxSchedulers.java   
public static Scheduler fixedSize(int size) {
  return autoClose(Executors.newFixedThreadPool(size, new ThreadFactory() {
    int i = 0;

    @Override
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setName("FixedSizeScheduler(size=" + size + ")-" + (++i));
      t.setPriority(Thread.NORM_PRIORITY);
      t.setDaemon(true);
      return t;
    }
  }));
}
项目:GitHub    文件:AppSchedulerProvider.java   
@Override
public Scheduler ui() {
    return AndroidSchedulers.mainThread();
}
项目:DailyTech    文件:AppSchedulerProvider.java   
@Override
public Scheduler computation() {
    return Schedulers.computation();
}
项目:sample-code-posts    文件:RxSchedulers.java   
public Scheduler getIoScheduler() {
    return ioScheduler;
}
项目:rxbus    文件:RxBus.java   
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
    return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
}
项目:Wallpapers-Android-Clean-Architecture    文件:Rx.java   
@Provides @Singleton @Named(Rx.IO) Scheduler provideIoScheduler() {
    return Schedulers.io();
}
项目:PhoneMall    文件:AppSchedulerProvider.java   
@Override
public Scheduler ui() {
  return AndroidSchedulers.mainThread();
}
项目:GitHub    文件:TestSchedulerProvider.java   
@Override
public Scheduler io() {
    return mTestScheduler;
}
项目:GitHub    文件:SchedulerProvider.java   
@Override
@NonNull
public Scheduler io() {
    return Schedulers.io();
}
项目:RetroMusicPlayer    文件:BaseSchedulerProvider.java   
@NonNull
Scheduler io();
项目:richeditor    文件:RxBus.java   
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer<Throwable> onError) {
    return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
}
项目:GitHub    文件:ImmediateSchedulerProvider.java   
@NonNull
@Override
public Scheduler io() {
    return Schedulers.trampoline();
}
项目:cleanarchitecture-unidirectional    文件:TimezoneUpdateUseCase.java   
public TimezoneUpdateUseCase(Scheduler workload, Scheduler delivery, RestService restService, IAuthService authService) {
    super(workload, delivery);
    this.restService = restService;
    this.authService = authService;
}
项目:Wallpapers-Android-Clean-Architecture    文件:OnBoardingScreenInteractor.java   
@Inject public OnBoardingScreenInteractor(@NonNull Scheduler observerScheduler,
        @NonNull Scheduler subscribeScheduler) {
    super(observerScheduler, subscribeScheduler);
}
项目:RxSWT    文件:SwtSchedulers.java   
/**
 * A {@link Scheduler} which executes actions on a {@code Display}.
 */
public static Scheduler from(Display display) {
    return RxSwtPlugins.onMainThreadScheduler(new DisplayScheduler(display));
}
项目:ZgzFromWithin    文件:MainModule.java   
@Provides
@Named("executor_schedule")
Scheduler provideExecutorThread() {
    return Schedulers.io();
}
项目:GitHub    文件:AndroidSchedulers.java   
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
项目:GitHub    文件:AndroidSchedulers.java   
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
    if (looper == null) throw new NullPointerException("looper == null");
    return new HandlerScheduler(new Handler(looper));
}
项目:GitHub    文件:RxAndroidPlugins.java   
public static void setInitMainThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
    onInitMainThreadHandler = handler;
}
项目:Coin-Tracker    文件:SchedulerProvider.java   
@NonNull
@Override
public Scheduler io() {
    return Schedulers.io();
}
项目:cleanarchitecture-unidirectional    文件:GetUserUseCase.java   
public GetUserUseCase(Scheduler workload, Scheduler delivery, RestService restService, IAuthService authService) {
    super(workload, delivery);
    this.restService = restService;
    this.authService = authService;
}
项目:Coin-Tracker    文件:BaseSchedulerProvider.java   
@NonNull
Scheduler io();
项目:Coin-Tracker    文件:SchedulerProvider.java   
@NonNull
@Override
public Scheduler computation() {
    return Schedulers.computation();
}