Java 类io.reactivex.processors.FlowableProcessor 实例源码
项目:GitHub
文件:MultiMission.java
@Override
public void init(Map<String, DownloadMission> missionMap,
Map<String, FlowableProcessor<DownloadEvent>> processorMap) {
DownloadMission mission = missionMap.get(getUrl());
if (mission == null) {
missionMap.put(getUrl(), this);
} else {
if (mission.isCanceled()) {
missionMap.put(getUrl(), this);
} else {
throw new IllegalArgumentException(formatStr(Constant.DOWNLOAD_URL_EXISTS, getUrl()));
}
}
this.processor = createProcessor(getUrl(), processorMap);
for (SingleMission each : missions) {
each.init(missionMap, processorMap);
}
}
项目:GitHub
文件:DownloadService.java
/**
* Receive the url download event.
* <p>
* Will receive the following event:
* {@link DownloadFlag#NORMAL}、{@link DownloadFlag#WAITING}、
* {@link DownloadFlag#STARTED}、{@link DownloadFlag#PAUSED}、
* {@link DownloadFlag#COMPLETED}、{@link DownloadFlag#FAILED};
* <p>
* Every event has {@link DownloadStatus}, you can get it and display it on the interface.
*
* @param url url
* @return DownloadEvent
*/
public FlowableProcessor<DownloadEvent> receiveDownloadEvent(String url) {
FlowableProcessor<DownloadEvent> processor = createProcessor(url, processorMap);
DownloadMission mission = missionMap.get(url);
if (mission == null) { //Not yet add this url mission.
DownloadRecord record = dataBaseHelper.readSingleRecord(url);
if (record == null) {
processor.onNext(normal(null));
} else {
File file = getFiles(record.getSaveName(), record.getSavePath())[0];
if (file.exists()) {
processor.onNext(createEvent(record.getFlag(), record.getStatus()));
} else {
processor.onNext(normal(null));
}
}
}
return processor;
}
项目:reduxfx
文件:ReduxFXStore.java
@SafeVarargs
public ReduxFXStore(S initialState, BiFunction<S, Object, Update<S>> updater, Middleware<S>... middlewares) {
final BiFunction<S, Object, Update<S>> chainedUpdater = applyMiddlewares(updater, middlewares);
final Publisher<Object> actionPublisher =
Flowable.create(actionEmitter -> this.actionEmitter = actionEmitter, BackpressureStrategy.BUFFER);
final FlowableProcessor<Update<S>> updateProcessor = BehaviorProcessor.create();
statePublisher = updateProcessor.map(Update::getState)
.startWith(initialState);
statePublisher.zipWith(actionPublisher, chainedUpdater::apply)
.subscribe(updateProcessor);
commandPublisher = updateProcessor
.map(Update::getCommands)
.flatMapIterable(commands -> commands);
}
项目:GitHub
文件:SingleMission.java
@Override
public void init(Map<String, DownloadMission> missionMap,
Map<String, FlowableProcessor<DownloadEvent>> processorMap) {
DownloadMission mission = missionMap.get(getUrl());
if (mission == null) {
missionMap.put(getUrl(), this);
} else {
if (mission.isCanceled()) {
missionMap.put(getUrl(), this);
} else {
throw new IllegalArgumentException(formatStr(Constant.DOWNLOAD_URL_EXISTS, getUrl()));
}
}
this.processor = createProcessor(getUrl(), processorMap);
}
项目:GitHub
文件:Utils.java
public static FlowableProcessor<DownloadEvent> createProcessor(
String missionId, Map<String, FlowableProcessor<DownloadEvent>> processorMap) {
if (processorMap.get(missionId) == null) {
FlowableProcessor<DownloadEvent> processor =
BehaviorProcessor.<DownloadEvent>create().toSerialized();
processorMap.put(missionId, processor);
}
return processorMap.get(missionId);
}
项目:MVPtemplate
文件:RxBus.java
public <T> Flowable<T> register(@NonNull Object tag) {
List<FlowableProcessor> processors = mProcessorMapper.get(tag);
if (null == processors) {
processors = new ArrayList<FlowableProcessor>();
mProcessorMapper.put(tag, processors);
}
FlowableProcessor<T> processor;
processors.add(processor = PublishProcessor.create());
return processor;
}
项目:MVPtemplate
文件:RxBus.java
@SuppressWarnings("rawtypes")
public void unregister(@NonNull Object tag) {
List<FlowableProcessor> processors = mProcessorMapper.get(tag);
if (null != processors) {
mProcessorMapper.remove(tag);
}
}
项目:MVPtemplate
文件:RxBus.java
/**
* 取消监听
* @param tag
* @param flowable
* @return
*/
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
@NonNull Flowable<?> flowable) {
if (null == flowable)
return getInstance();
List<FlowableProcessor> processors = mProcessorMapper.get(tag);
if (null != processors) {
processors.remove((FlowableProcessor<?>) flowable);
if (isEmpty(processors)) {
mProcessorMapper.remove(tag);
}
}
return getInstance();
}
项目:MVPtemplate
文件:RxBus.java
/**
* 触发事件
*
* @param content
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void post(@NonNull Object tag, @NonNull Object content) {
List<FlowableProcessor> processors = mProcessorMapper.get(tag);
if (!isEmpty(processors)) {
for (FlowableProcessor processor : processors) {
processor.onNext(content);
}
}
}
项目:RxJava2Jdk9Interop
文件:FlowInterop.java
/**
* Wraps a Flow.Processor (identity) into a FlowableProcessor.
* @param source the source Flow.Processor, not null
* @param <T> the input and output type of the Flow.Processor
* @return the new FlowableProcessor instance
* @throws NullPointerException if source is null
*/
@SuppressWarnings("unchecked")
public static <T> FlowableProcessor<T> fromFlowProcessor(Flow.Processor<T, T> source) {
if (source instanceof FlowableProcessor) {
return (FlowableProcessor<T>)source;
}
ObjectHelper.requireNonNull(source, "source is null");
return new FlowableProcessorFromFlowProcessor<>(source);
}
项目:RxJava2Jdk9Interop
文件:FlowInteropTest.java
@Test
public void flowProcessorToFlowableProcessor() {
TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>();
FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp);
assertFalse(fp.hasSubscribers());
assertFalse(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
TestSubscriber<Integer> ts = fp.test();
assertTrue(fp.hasSubscribers());
assertFalse(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
fp.onNext(1);
fp.onNext(2);
fp.onNext(3);
fp.onNext(4);
fp.onNext(5);
fp.onComplete();
assertFalse(fp.hasSubscribers());
assertTrue(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
ts.assertResult(1, 2, 3, 4, 5);
}
项目:RxJava2Jdk9Interop
文件:FlowInteropTest.java
@Test
public void flowProcessorToFlowableProcessorTake() {
TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>();
FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp);
assertFalse(fp.hasSubscribers());
assertFalse(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
TestSubscriber<Integer> ts = fp.take(3).test();
assertTrue(fp.hasSubscribers());
assertFalse(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
fp.onNext(1);
fp.onNext(2);
fp.onNext(3);
assertFalse(fp.hasSubscribers());
assertFalse(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
fp.onNext(4);
fp.onNext(5);
fp.onComplete();
assertFalse(fp.hasSubscribers());
assertTrue(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
ts.assertResult(1, 2, 3);
}
项目:RxJava2Jdk9Interop
文件:FlowInteropTest.java
@Test
public void flowProcessorToFlowableProcessorError() {
TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>();
FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp);
assertFalse(fp.hasSubscribers());
assertFalse(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
TestSubscriber<Integer> ts = fp.test();
assertTrue(fp.hasSubscribers());
assertFalse(fp.hasComplete());
assertFalse(fp.hasThrowable());
assertNull(fp.getThrowable());
fp.onNext(1);
fp.onNext(2);
fp.onNext(3);
fp.onNext(4);
fp.onNext(5);
fp.onError(new IOException());
assertFalse(fp.hasSubscribers());
assertFalse(fp.hasComplete());
assertTrue(fp.hasThrowable());
assertNotNull(fp.getThrowable());
ts.assertFailure(IOException.class, 1, 2, 3, 4, 5);
}
项目:reduxfx
文件:ComponentDriver.java
private Flowable<IntegerChangedCommand> getIntegerChangedCommandFlowable() {
if (integerChangedCommandFlowable == null) {
final FlowableProcessor<IntegerChangedCommand> processor = PublishProcessor.create();
commandProcessor
.filter(command -> command instanceof IntegerChangedCommand)
.map(command -> (IntegerChangedCommand) command)
.subscribe(processor);
integerChangedCommandFlowable = processor;
}
return integerChangedCommandFlowable;
}
项目:reduxfx
文件:ComponentDriver.java
private Flowable<ObjectChangedCommand<?>> getObjectChangedCommandFlowable() {
if (objectChangedCommandFlowable == null) {
final FlowableProcessor<ObjectChangedCommand<?>> processor = PublishProcessor.create();
commandProcessor
.filter(command -> command instanceof ObjectChangedCommand)
.map(command -> (ObjectChangedCommand<?>) command)
.subscribe(processor);
objectChangedCommandFlowable = processor;
}
return objectChangedCommandFlowable;
}
项目:reduxfx
文件:ComponentDriver.java
private Flowable<FireEventCommand<? extends Event>> getFireEventCommandFlowable() {
if (fireEventCommandFlowable == null) {
final FlowableProcessor<FireEventCommand<? extends Event>> processor = PublishProcessor.create();
commandProcessor
.filter(command -> command instanceof FireEventCommand)
.map(command -> (FireEventCommand<? extends Event>) command)
.subscribe(processor);
fireEventCommandFlowable = processor;
}
return fireEventCommandFlowable;
}
项目:GitHub
文件:DownloadMission.java
public abstract void init(Map<String, DownloadMission> missionMap,
Map<String, FlowableProcessor<DownloadEvent>> processorMap);
项目:RxShell
文件:Harvester.java
BaseSub(String tag, Subscriber<? super T> customer, @Nullable List<String> buffer, @Nullable FlowableProcessor<String> processor) {
this.tag = tag;
this.customer = customer;
this.processor = processor;
this.buffer = buffer;
}
项目:RxShell
文件:Cmd.java
public FlowableProcessor<String> getOutputProcessor() {
return outputProcessor;
}
项目:RxShell
文件:Cmd.java
public FlowableProcessor<String> getErrorProcessor() {
return errorProcessor;
}
项目:MVPtemplate
文件:RxBus.java
@SuppressWarnings("rawtypes")
public static boolean isEmpty(Collection<FlowableProcessor> collection) {
return null == collection || collection.isEmpty();
}
项目:richeditor
文件:RequestBodyWrapper.java
public FlowableProcessor<BaseUploadBean> getUploadProcessor() {
return mUploadProcessor;
}
项目:RHub
文件:RxJava2ProcProxy.java
public RxJava2ProcProxy(FlowableProcessor proc, TePolicy tePolicy) {
super(proc, tePolicy);
}
项目:RxJava2Extensions
文件:RefCountProcessor.java
@SuppressWarnings("unchecked")
RefCountProcessor(FlowableProcessor<T> actual) {
this.actual = actual;
this.upstream = new AtomicReference<Subscription>();
this.subscribers = new AtomicReference<RefCountSubscriber<T>[]>(EMPTY);
}
项目:viska-android
文件:WebRtcPlugin.java
public WebRtcPlugin() {
final FlowableProcessor<EventObject> unsafeStream = PublishProcessor.create();
this.eventStream = unsafeStream.toSerialized();
}
项目:resilience4j
文件:RxJava2Adapter.java
/**
* Converts the EventPublisher into a Flowable.
*
* @param eventPublisher the event publisher
* @param <T> the type of the event
* @return the Flowable
*/
public static <T> Flowable<T> toFlowable(EventPublisher<T> eventPublisher) {
PublishProcessor<T> publishProcessor = PublishProcessor.create();
FlowableProcessor<T> flowableProcessor = publishProcessor.toSerialized();
eventPublisher.onEvent(flowableProcessor::onNext);
return flowableProcessor;
}