@Override public void init(Map<String, DownloadMission> missionMap, Map<String, FlowableProcessor<DownloadEvent>> processorMap) { DownloadMission mission = missionMap.get(getUrl()); if (mission == null) { missionMap.put(getUrl(), this); } else { if (mission.isCanceled()) { missionMap.put(getUrl(), this); } else { throw new IllegalArgumentException(formatStr(Constant.DOWNLOAD_URL_EXISTS, getUrl())); } } this.processor = createProcessor(getUrl(), processorMap); for (SingleMission each : missions) { each.init(missionMap, processorMap); } }
/** * Receive the url download event. * <p> * Will receive the following event: * {@link DownloadFlag#NORMAL}、{@link DownloadFlag#WAITING}、 * {@link DownloadFlag#STARTED}、{@link DownloadFlag#PAUSED}、 * {@link DownloadFlag#COMPLETED}、{@link DownloadFlag#FAILED}; * <p> * Every event has {@link DownloadStatus}, you can get it and display it on the interface. * * @param url url * @return DownloadEvent */ public FlowableProcessor<DownloadEvent> receiveDownloadEvent(String url) { FlowableProcessor<DownloadEvent> processor = createProcessor(url, processorMap); DownloadMission mission = missionMap.get(url); if (mission == null) { //Not yet add this url mission. DownloadRecord record = dataBaseHelper.readSingleRecord(url); if (record == null) { processor.onNext(normal(null)); } else { File file = getFiles(record.getSaveName(), record.getSavePath())[0]; if (file.exists()) { processor.onNext(createEvent(record.getFlag(), record.getStatus())); } else { processor.onNext(normal(null)); } } } return processor; }
@SafeVarargs public ReduxFXStore(S initialState, BiFunction<S, Object, Update<S>> updater, Middleware<S>... middlewares) { final BiFunction<S, Object, Update<S>> chainedUpdater = applyMiddlewares(updater, middlewares); final Publisher<Object> actionPublisher = Flowable.create(actionEmitter -> this.actionEmitter = actionEmitter, BackpressureStrategy.BUFFER); final FlowableProcessor<Update<S>> updateProcessor = BehaviorProcessor.create(); statePublisher = updateProcessor.map(Update::getState) .startWith(initialState); statePublisher.zipWith(actionPublisher, chainedUpdater::apply) .subscribe(updateProcessor); commandPublisher = updateProcessor .map(Update::getCommands) .flatMapIterable(commands -> commands); }
@Override public void init(Map<String, DownloadMission> missionMap, Map<String, FlowableProcessor<DownloadEvent>> processorMap) { DownloadMission mission = missionMap.get(getUrl()); if (mission == null) { missionMap.put(getUrl(), this); } else { if (mission.isCanceled()) { missionMap.put(getUrl(), this); } else { throw new IllegalArgumentException(formatStr(Constant.DOWNLOAD_URL_EXISTS, getUrl())); } } this.processor = createProcessor(getUrl(), processorMap); }
public static FlowableProcessor<DownloadEvent> createProcessor( String missionId, Map<String, FlowableProcessor<DownloadEvent>> processorMap) { if (processorMap.get(missionId) == null) { FlowableProcessor<DownloadEvent> processor = BehaviorProcessor.<DownloadEvent>create().toSerialized(); processorMap.put(missionId, processor); } return processorMap.get(missionId); }
public <T> Flowable<T> register(@NonNull Object tag) { List<FlowableProcessor> processors = mProcessorMapper.get(tag); if (null == processors) { processors = new ArrayList<FlowableProcessor>(); mProcessorMapper.put(tag, processors); } FlowableProcessor<T> processor; processors.add(processor = PublishProcessor.create()); return processor; }
@SuppressWarnings("rawtypes") public void unregister(@NonNull Object tag) { List<FlowableProcessor> processors = mProcessorMapper.get(tag); if (null != processors) { mProcessorMapper.remove(tag); } }
/** * 取消监听 * @param tag * @param flowable * @return */ @SuppressWarnings("rawtypes") public RxBus unregister(@NonNull Object tag, @NonNull Flowable<?> flowable) { if (null == flowable) return getInstance(); List<FlowableProcessor> processors = mProcessorMapper.get(tag); if (null != processors) { processors.remove((FlowableProcessor<?>) flowable); if (isEmpty(processors)) { mProcessorMapper.remove(tag); } } return getInstance(); }
/** * 触发事件 * * @param content */ @SuppressWarnings({"unchecked", "rawtypes"}) public void post(@NonNull Object tag, @NonNull Object content) { List<FlowableProcessor> processors = mProcessorMapper.get(tag); if (!isEmpty(processors)) { for (FlowableProcessor processor : processors) { processor.onNext(content); } } }
/** * Wraps a Flow.Processor (identity) into a FlowableProcessor. * @param source the source Flow.Processor, not null * @param <T> the input and output type of the Flow.Processor * @return the new FlowableProcessor instance * @throws NullPointerException if source is null */ @SuppressWarnings("unchecked") public static <T> FlowableProcessor<T> fromFlowProcessor(Flow.Processor<T, T> source) { if (source instanceof FlowableProcessor) { return (FlowableProcessor<T>)source; } ObjectHelper.requireNonNull(source, "source is null"); return new FlowableProcessorFromFlowProcessor<>(source); }
@Test public void flowProcessorToFlowableProcessor() { TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>(); FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp); assertFalse(fp.hasSubscribers()); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); TestSubscriber<Integer> ts = fp.test(); assertTrue(fp.hasSubscribers()); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); fp.onNext(1); fp.onNext(2); fp.onNext(3); fp.onNext(4); fp.onNext(5); fp.onComplete(); assertFalse(fp.hasSubscribers()); assertTrue(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); ts.assertResult(1, 2, 3, 4, 5); }
@Test public void flowProcessorToFlowableProcessorTake() { TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>(); FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp); assertFalse(fp.hasSubscribers()); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); TestSubscriber<Integer> ts = fp.take(3).test(); assertTrue(fp.hasSubscribers()); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); fp.onNext(1); fp.onNext(2); fp.onNext(3); assertFalse(fp.hasSubscribers()); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); fp.onNext(4); fp.onNext(5); fp.onComplete(); assertFalse(fp.hasSubscribers()); assertTrue(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); ts.assertResult(1, 2, 3); }
@Test public void flowProcessorToFlowableProcessorError() { TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>(); FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp); assertFalse(fp.hasSubscribers()); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); TestSubscriber<Integer> ts = fp.test(); assertTrue(fp.hasSubscribers()); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); fp.onNext(1); fp.onNext(2); fp.onNext(3); fp.onNext(4); fp.onNext(5); fp.onError(new IOException()); assertFalse(fp.hasSubscribers()); assertFalse(fp.hasComplete()); assertTrue(fp.hasThrowable()); assertNotNull(fp.getThrowable()); ts.assertFailure(IOException.class, 1, 2, 3, 4, 5); }
private Flowable<IntegerChangedCommand> getIntegerChangedCommandFlowable() { if (integerChangedCommandFlowable == null) { final FlowableProcessor<IntegerChangedCommand> processor = PublishProcessor.create(); commandProcessor .filter(command -> command instanceof IntegerChangedCommand) .map(command -> (IntegerChangedCommand) command) .subscribe(processor); integerChangedCommandFlowable = processor; } return integerChangedCommandFlowable; }
private Flowable<ObjectChangedCommand<?>> getObjectChangedCommandFlowable() { if (objectChangedCommandFlowable == null) { final FlowableProcessor<ObjectChangedCommand<?>> processor = PublishProcessor.create(); commandProcessor .filter(command -> command instanceof ObjectChangedCommand) .map(command -> (ObjectChangedCommand<?>) command) .subscribe(processor); objectChangedCommandFlowable = processor; } return objectChangedCommandFlowable; }
private Flowable<FireEventCommand<? extends Event>> getFireEventCommandFlowable() { if (fireEventCommandFlowable == null) { final FlowableProcessor<FireEventCommand<? extends Event>> processor = PublishProcessor.create(); commandProcessor .filter(command -> command instanceof FireEventCommand) .map(command -> (FireEventCommand<? extends Event>) command) .subscribe(processor); fireEventCommandFlowable = processor; } return fireEventCommandFlowable; }
public abstract void init(Map<String, DownloadMission> missionMap, Map<String, FlowableProcessor<DownloadEvent>> processorMap);
BaseSub(String tag, Subscriber<? super T> customer, @Nullable List<String> buffer, @Nullable FlowableProcessor<String> processor) { this.tag = tag; this.customer = customer; this.processor = processor; this.buffer = buffer; }
public FlowableProcessor<String> getOutputProcessor() { return outputProcessor; }
public FlowableProcessor<String> getErrorProcessor() { return errorProcessor; }
@SuppressWarnings("rawtypes") public static boolean isEmpty(Collection<FlowableProcessor> collection) { return null == collection || collection.isEmpty(); }
public FlowableProcessor<BaseUploadBean> getUploadProcessor() { return mUploadProcessor; }
public RxJava2ProcProxy(FlowableProcessor proc, TePolicy tePolicy) { super(proc, tePolicy); }
@SuppressWarnings("unchecked") RefCountProcessor(FlowableProcessor<T> actual) { this.actual = actual; this.upstream = new AtomicReference<Subscription>(); this.subscribers = new AtomicReference<RefCountSubscriber<T>[]>(EMPTY); }
public WebRtcPlugin() { final FlowableProcessor<EventObject> unsafeStream = PublishProcessor.create(); this.eventStream = unsafeStream.toSerialized(); }
/** * Converts the EventPublisher into a Flowable. * * @param eventPublisher the event publisher * @param <T> the type of the event * @return the Flowable */ public static <T> Flowable<T> toFlowable(EventPublisher<T> eventPublisher) { PublishProcessor<T> publishProcessor = PublishProcessor.create(); FlowableProcessor<T> flowableProcessor = publishProcessor.toSerialized(); eventPublisher.onEvent(flowableProcessor::onNext); return flowableProcessor; }