private Observable<NullableContainer<String>> filterThenDistinctUntilChanged( Observable<NullableContainer<String>> observable) { return observable .filter( new Predicate<NullableContainer<String>>() { @Override public boolean test(NullableContainer<String> container) throws Exception { return container.get() != null; } }) .distinctUntilChanged( new BiPredicate<NullableContainer<String>, NullableContainer<String>>() { @Override public boolean test(NullableContainer<String> nc1, NullableContainer<String> nc2) { return nc1.get().length() == nc2.get().length() && nc1.get().contains(nc2.get()) && nc2.get().contains(nc1.get()); } }); }
@Test public void inWithTrueBiPredicateThenReturnTrue() { Collection<TestClass> testClasses = new ArrayList<>(2); TestClass testClass = new TestClass(); testClasses.add(testClass); testClasses.add(new TestClass()); testClasses.add(null); boolean result = Chain.let(testClass) .in(testClasses, new BiPredicate<TestClass, TestClass>() { @Override public boolean test(@NonNull TestClass original, @NonNull TestClass collectionItem) { return original.equals(collectionItem); } }) .call() .getValue1(); assertTrue(result); }
@Test public void inWithFalseBiPredicateThenReturnTrue() { Collection<TestClass> testClasses = new ArrayList<>(2); TestClass testClass = new TestClass(); testClasses.add(testClass); testClasses.add(new TestClass()); testClasses.add(null); boolean result = Chain.let(testClass) .in(testClasses, new BiPredicate<TestClass, TestClass>() { @Override public boolean test(@NonNull TestClass original, @NonNull TestClass collectionItem) { return false; } }) .call() .getValue1(); assertFalse(result); }
public static <U> ObservableTransformer<U, U> retry(final String hint, final int retryCount) { return new ObservableTransformer<U, U>() { @Override public ObservableSource<U> apply(Observable<U> upstream) { return upstream.retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(Integer integer, Throwable throwable) throws Exception { return retry(hint, retryCount, integer, throwable); } }); } }; }
public static <U> FlowableTransformer<U, U> retry2(final String hint, final int retryCount) { return new FlowableTransformer<U, U>() { @Override public Publisher<U> apply(Flowable<U> upstream) { return upstream.retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(Integer integer, Throwable throwable) throws Exception { return retry(hint, retryCount, integer, throwable); } }); } }; }
InOperator(Collection<T> collection, BiPredicate<T, T> comparator, InternalConfiguration configuration) { this.collection = new ArrayList<>(); if (collection != null && !collection.isEmpty()) { this.collection.addAll(collection); } this.comparator = comparator; this.configuration = configuration; }
private void retryWithBiPredicate(final boolean flag) { Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { for (int i = 0; i <= 3; i++) { if (i == 2) { e.onError(new Exception("出现错误了!")); } else { e.onNext(i + ""); } Thread.sleep(1000); } e.onComplete(); } }) .retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception { Log.d(TAG, "retry错误: " + integer + ", " + throwable.toString()); //false 不让重新发射数据了,调用观察者的onError就终止了 //true 被观察者重新发射请求(一直发射知道没有错误) return flag; } }) .subscribeOn( .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { Log.d(TAG, "subscribe time: " + disposable.isDisposed()); } }) .subscribe(getObserver()); }
public static <T, R> BiPredicate<T, R> alwaysTrue() { // TODO make holder return new BiPredicate<T, R>() { @Override public boolean test(T t1, R t2) throws Exception { return true; } }; }
public static <T, R> BiPredicate<T, R> alwaysFalse() { // TODO make holder return new BiPredicate<T, R>() { @Override public boolean test(T t1, R t2) throws Exception { return false; } }; }
public static <T, R> BiPredicate<T, R> throwing() { // TODO make holder return new BiPredicate<T, R>() { @Override public boolean test(T t1, R t2) throws Exception { throw new ThrowingException(); } }; }
private TransformerStateMachine(Callable<? extends State> initialState, Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition, BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, BackpressureStrategy backpressureStrategy, int requestBatchSize) { Preconditions.checkNotNull(initialState); Preconditions.checkNotNull(transition); Preconditions.checkNotNull(completion); Preconditions.checkNotNull(backpressureStrategy); Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero"); this.initialState = initialState; this.transition = transition; this.completion = completion; this.backpressureStrategy = backpressureStrategy; this.requestBatchSize = requestBatchSize; }
public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState, Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition, BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, BackpressureStrategy backpressureStrategy, int requestBatchSize) { return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy, requestBatchSize); }
private static <State, Out, In> Function<Notification<In>, Flowable<Notification<Out>>> execute( final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition, final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, final Mutable<State> state, final BackpressureStrategy backpressureStrategy) { return new Function<Notification<In>, Flowable<Notification<Out>>>() { @Override public Flowable<Notification<Out>> apply(final Notification<In> in) { return Flowable.create(new FlowableOnSubscribe<Notification<Out>>() { @Override public void subscribe(FlowableEmitter<Notification<Out>> emitter) throws Exception { FlowableEmitter<Out> w = wrap(emitter); if (in.isOnNext()) { state.value = transition.apply(state.value, in.getValue(), w); if (!emitter.isCancelled()) emitter.onComplete(); else { // this is a special emission to indicate that // the transition called unsubscribe. It will be // filtered later. emitter.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification()); } } else if (in.isOnComplete()) { if (completion.test(state.value, w) && !emitter.isCancelled()) { w.onComplete(); } } else if (!emitter.isCancelled()) { w.onError(in.getError()); } } }, backpressureStrategy); } }; }
public FlowableCollectWhile(Flowable<T> source, Callable<R> collectionFactory, BiFunction<? super R, ? super T, ? extends R> add, BiPredicate<? super R, ? super T> condition, boolean emitRemainder) { super(); this.source = source; this.collectionFactory = collectionFactory; this.add = add; this.condition = condition; this.emitRemainder = emitRemainder; }
CollectWhileSubscriber(Callable<R> collectionFactory, BiFunction<? super R, ? super T, ? extends R> add, BiPredicate<? super R, ? super T> condition, Subscriber<? super R> child, boolean emitRemainder) { this.collectionFactory = collectionFactory; this.add = add; this.condition = condition; this.child = child; this.emitRemainder = emitRemainder; }
public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState, Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition, BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, BackpressureStrategy backpressureStrategy, int requestBatchSize) { return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy, requestBatchSize); }
public static <T, R> FlowableTransformer<T, R> collectWhile(final Callable<R> collectionFactory, final BiFunction<? super R, ? super T, ? extends R> add, final BiPredicate<? super R, ? super T> condition, final boolean emitRemainder) { return new FlowableTransformer<T, R>() { @Override public Publisher<R> apply(Flowable<T> source) { return new FlowableCollectWhile<T, R>(source, collectionFactory, add, condition, emitRemainder); } }; }
/** * Returns an observable of the tic tac toe board. First value is provided immediately, * succeeding values are guaranteed to be distinct from previous values. Values are * always provided on the main thread. */ public Observable<Value[][]> grid() { return grid.distinctUntilChanged(new BiPredicate<Value[][], Value[][]>() { @Override public boolean test(Value[][] a, Value[][] b) throws Exception { return Arrays.equals(a, b); } }); }
/** * Returns an observable of the tic tac toe board. First value is provided immediately, * succeeding values are guaranteed to be distinct from previous values. Values are * always provided on the main thread. */ Observable<Value[][]> grid() { return grid.distinctUntilChanged(new BiPredicate<Value[][], Value[][]>() { @Override public boolean test(Value[][] a, Value[][] b) throws Exception { return Arrays.equals(a, b); } }); }
public static <T, R> FlowableTransformer<T, R> collectWhile(final Callable<R> collectionFactory, final BiFunction<? super R, ? super T, ? extends R> add, final BiPredicate<? super R, ? super T> condition) { return collectWhile(collectionFactory, add, condition, true); }
public static <T> FlowableTransformer<T, List<T>> toListWhile( final BiPredicate<? super List<T>, ? super T> condition, boolean emitRemainder) { return collectWhile(ListFactoryHolder.<T>factory(), ListFactoryHolder.<T>add(), condition, emitRemainder); }
public static <T> FlowableTransformer<T, List<T>> toListWhile( final BiPredicate<? super List<T>, ? super T> condition) { return toListWhile(condition, true); }
public static <T> FlowableTransformer<T, List<T>> bufferWhile( final BiPredicate<? super List<T>, ? super T> condition, boolean emitRemainder) { return toListWhile(condition, emitRemainder); }
public static <T> FlowableTransformer<T, List<T>> bufferWhile( final BiPredicate<? super List<T>, ? super T> condition) { return toListWhile(condition); }
@CheckReturnValue @SchedulerSupport("none") public Single<Boolean> contains(Object value, BiPredicate<Object, Object> comparer) { return boxed.contains(value, comparer); }
@CheckReturnValue @SchedulerSupport("none") public Single<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) { return boxed.retry(predicate); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") public Flowable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer) { return boxed.distinctUntilChanged(comparer); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport("none") public Flowable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) { return boxed.retry(predicate); }
/** * check if the current Object in the {@link Optional} is available in the passed {@link Collection} * * @param collection the {@link Collection} that holds the items * @param comparator the {@link BiPredicate} that will be invoked over every item, the stored * Object will be passed as it's first parameter, and the item in the * {@link Collection} will be passed as the second parameter, if the returned * value is {@code true}, this means that both items are equal, if * the returned item is {@code false}, they do not match * @return a {@link Condition} that will execute it's {@link Condition#then(Consumer)} or * similar methods if the item is available in the passed {@link Collection} */ public Condition<Optional<T>, T> whenIn(Collection<T> collection, BiPredicate<T, T> comparator) { return Condition.createNormal(this, new InOperator<>(collection, comparator, chain.configuration)); }
/** * check if the current Object in the {@link Optional} is NOT available in the * passed {@link Collection} * * @param collection the {@link Collection} that holds the items * @param comparator the {@link BiPredicate} that will be invoked over every item, the stored * Object will be passed as it's first parameter, and the item in the * {@link Collection} will be passed as the second parameter, if the returned * value is {@code true}, this means that both items are equal, if * the returned item is {@code false}, they do not match * @return a {@link Condition} that will execute it's {@link Condition#then(Consumer)} or * similar methods if the item is NOT available in the passed {@link Collection} */ public Condition<Optional<T>, T> whenNotIn(Collection<T> collection, BiPredicate<T, T> comparator) { return Condition.createNegated(this, new InOperator<>(collection, comparator, chain.configuration)); }
/** * check if the current Object in the chain is available in the passed {@link Collection} * * @param collection the {@link Collection} that holds the items * @param comparator the {@link BiPredicate} that will be invoked over every item, the stored * Object will be passed as it's first parameter, and the item in the * {@link Collection} will be passed as the second parameter, if the returned * value is {@code true}, this means that both items are equal, if * the returned item is {@code false}, they do not match * @return a new {@link Chain} holding a {@link Pair}, where {@link Pair#getValue0()} will * return the original Object, and {@link Pair#getValue1()} will return a boolean indicating * weather the the Object was found in the passed {@link Collection} or not * @deprecated use {@link #whenIn(Collection, BiPredicate)} instead */ @Deprecated public Chain<Pair<T, Boolean>> in(Collection<T> collection, BiPredicate<T, T> comparator) { boolean inCollection = new InOperator<>(collection, comparator, configuration).test(item); return new Chain<>(Pair.with(item, inCollection), configuration); }
/** * check if the current Object in the {@link Chain} is available in the passed * {@link Collection} * * @param collection the {@link Collection} that holds the items * @param comparator the {@link BiPredicate} that will be invoked over every item, the stored * Object will be passed as it's first parameter, and the item in the * {@link Collection} will be passed as the second parameter, if the returned * value is {@code true}, this means that both items are equal, if * the returned item is {@code false}, they do not match * @return a {@link Condition} that will execute it's {@link Condition#then(Consumer)} or * similar methods if the item is available in the passed {@link Collection} */ public Condition<Chain<T>, T> whenIn(Collection<T> collection, BiPredicate<T, T> comparator) { return Condition.createNormal(this, new InOperator<>(collection, comparator, configuration)); }
/** * check if the current Object in the {@link Chain} is NOT available in the passed * {@link Collection} * * @param collection the {@link Collection} that holds the items * @param comparator the {@link BiPredicate} that will be invoked over every item, the stored * Object will be passed as it's first parameter, and the item in the * {@link Collection} will be passed as the second parameter, if the returned * value is {@code true}, this means that both items are equal, if * the returned item is {@code false}, they do not match * @return a {@link Condition} that will execute it's {@link Condition#then(Consumer)} or * similar methods if the item is NOT available in the passed {@link Collection} */ public Condition<Chain<T>, T> whenNotIn(Collection<T> collection, BiPredicate<T, T> comparator) { return Condition.createNegated(this, new InOperator<>(collection, comparator, configuration)); }