/** * @param emit * @param <R> * @return */ @NonNull @CheckReturnValue public static <R> OnCompleteListener<R> listener(@NonNull final MaybeEmitter<R> emit) { return new OnCompleteListener<R>() { @Override public void onComplete(@NonNull final Task<R> task) { if (!emit.isDisposed()) { if (task.isSuccessful()) { R result = task.getResult(); if (result != null) { emit.onSuccess(result); } emit.onComplete(); } else { Exception e = task.getException(); emit.onError(e != null ? e : new RuntimeException()); } } } }; }
/** * @param emit * @param <R> * @return */ @NonNull @CheckReturnValue public static <R> OnCompleteListener<R> listener(@NonNull final CompletableEmitter emit) { return new OnCompleteListener<R>() { @Override public void onComplete(@NonNull final Task<R> task) { if (!emit.isDisposed()) { if (task.isSuccessful()) { emit.onComplete(); } else { Exception e = task.getException(); emit.onError(e != null ? e : new RuntimeException()); } } } }; }
/** * @param emit * @param <R> * @return */ @NonNull @CheckReturnValue public static <R> OnCompleteListener<R> listener(@NonNull final SingleEmitter<R> emit) { return new OnCompleteListener<R>() { @Override public void onComplete(@NonNull final Task<R> task) { if (!emit.isDisposed()) { if (task.isSuccessful()) { emit.onSuccess(task.getResult()); } else { Exception e = task.getException(); emit.onError(e != null ? e : new RuntimeException()); } } } }; }
@Override @NonNull @CheckReturnValue public ObservableSource<DataValue<T>> apply(@NonNull Observable<DataSnapshot> upstream) { return upstream.map(new Function<DataSnapshot, DataValue<T>>() { @Override public DataValue<T> apply(DataSnapshot dataSnapshot) throws Exception { DataValue<T> result; if (dataSnapshot.exists()) { result = DataValue.of(dataSnapshot.getValue(typeIndicator)); } else { result = DataValue.empty(); } return result; } }); }
@Override @NonNull @CheckReturnValue public ObservableSource<DataValue<T>> apply(@NonNull Observable<DataSnapshot> upstream) { return upstream.map(new Function<DataSnapshot, DataValue<T>>() { @Override public DataValue<T> apply(DataSnapshot dataSnapshot) throws Exception { DataValue<T> result; if (dataSnapshot.exists()) { result = DataValue.of(dataSnapshot.getValue(clazz)); } else { result = DataValue.empty(); } return result; } }); }
/** * @param emit * @return */ @NonNull @CheckReturnValue public static ValueEventListener listener(@NonNull final SingleEmitter<DataSnapshot> emit) { return new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { if (!emit.isDisposed()) { emit.onSuccess(dataSnapshot); } } @Override public void onCancelled(DatabaseError e) { if (!emit.isDisposed()) { emit.onError(e.toException()); } } }; }
/** * @param user * @param forceRefresh * @return */ @CheckReturnValue @NonNull public static Single<String> getToken(@NonNull final FirebaseUser user, final boolean forceRefresh) { return RxTask.single(new Callable<Task<GetTokenResult>>() { @Override public Task<GetTokenResult> call() throws Exception { return user.getToken(forceRefresh); } }) .map(new Function<GetTokenResult, String>() { @Override public String apply(@NonNull GetTokenResult getTokenResult) throws Exception { return getTokenResult.getToken(); } }); }
/** * TODO: Should use Maybe instead of Single * TODO: flatten List * * @param instance * @param email * @return <emptyList> if providers is null */ @CheckReturnValue @NonNull public static Single<List<String>> fetchProvidersForEmail( @NonNull final FirebaseAuth instance, @NonNull final String email) { return RxTask.single(new Callable<Task<ProviderQueryResult>>() { @Override public Task<ProviderQueryResult> call() throws Exception { return instance.fetchProvidersForEmail(email); } }).map(new Function<ProviderQueryResult, List<String>>() { @Override public List<String> apply(@NonNull ProviderQueryResult providerQueryResult) throws Exception { List<String> providers = providerQueryResult.getProviders(); if (null == providers) { providers = Collections.emptyList(); } return providers; } }); }
@CheckReturnValue @NonNull public static <R> Observable<R> observable(@NonNull final Task<R> task) { return Observable.create(emitter -> { task.continueWith(t -> { if (emitter.isDisposed()) return null; if (t.isCancelled()) { // NOTICE: doOnUnsubscribe(() -> Observable.just(query) in outside emitter.onComplete(); } else if (t.isFaulted()) { Throwable error = t.getError(); emitter.onError(error); } else { R r = t.getResult(); if (r != null) emitter.onNext(r); emitter.onComplete(); } return null; }); }); // TODO .doOnUnsubscribe(() -> task.setCancelled()); }
/** * Please don't put the Task<Void> as parameter * @param task * @param <R> * @return */ @CheckReturnValue @NonNull public static <R> Single<R> single(@NonNull final Task<R> task) { return Single.create(emitter -> { task.continueWith(t -> { if (emitter.isDisposed()) return null; if (t.isCancelled()) { emitter.onError(new RuntimeException("Cancelled task")); } else if (t.isFaulted()) { Throwable error = t.getError(); emitter.onError(error); } else { R r = t.getResult(); emitter.onSuccess(r); } return null; }); }); // TODO .doOnUnsubscribe(() -> task.setCancelled()); }
/** * Limit 10000 by skip */ @NonNull @CheckReturnValue public static <R extends ParseObject> Observable<R> all(@NonNull final ParseQuery<R> query, int count) { final int limit = 1000; // limit limitation query.setSkip(0); query.setLimit(limit); Observable<R> find = find(query); for (int i = limit; i < count; i+= limit) { if (i >= 10000) break; // skip limitation query.setSkip(i); query.setLimit(limit); find.concatWith(find(query)); } return find.distinct(o -> o.getObjectId()); }
/** * Map emitted items from the source observable into {@link Permission} objects for each * permission in parameters. * <p> * If one or several permissions have never been requested, invoke the related framework method * to ask the user if he allows the permissions. */ @NonNull @CheckReturnValue private <T> ObservableTransformer<T, Permission> ensureEach(@NonNull final String... permissions) { checkPermissions(permissions); return new ObservableTransformer<T, Permission>() { @Override @NonNull @CheckReturnValue public ObservableSource<Permission> apply(final Observable<T> o) { return request(o, permissions); } }; }
@NonNull @CheckReturnValue @SuppressWarnings("checkstyle:overloadmethodsdeclarationorder") Observable<Permission> request(final Observable<?> trigger, @NonNull final String... permissions) { return Observable.merge(trigger, pending(permissions)) .flatMap(new Function<Object, Observable<Permission>>() { @Override @NonNull @CheckReturnValue public Observable<Permission> apply(final Object o) throws Exception { // NOPMD return requestOnM(permissions); } }); }
@NonNull @CheckReturnValue private Observable<?> pending(@NonNull final String... permissions) { for (final String p : permissions) { if (!currentPermissionRequests.containsKey(p)) { return Observable.empty(); } } return Observable.just(TRIGGER); }
@NonNull @CheckReturnValue @TargetApi(M) Observable<Permission> requestOnM(@NonNull final String... permissions) { final List<Observable<Permission>> list = new ArrayList<>(permissions.length); final List<String> unrequestedPermissions = new ArrayList<>(); // In case of multiple permissions, we create an observable for each of them. // At the end, the observables are combined to have a unique response. for (final String permission : permissions) { if (isGranted(permission)) { list.add(Observable.just(Permission.granted(permission))); } else if (isRevokedByPolicy(permission)) { list.add(Observable.just(Permission.revokedByPolicy(permission))); } else { PublishSubject<Permission> subject = currentPermissionRequests.get(permission); // Create a new subject if not exists if (subject == null) { unrequestedPermissions.add(permission); subject = PublishSubject.create(); currentPermissionRequests.put(permission, subject); } list.add(subject); } } if (!unrequestedPermissions.isEmpty()) { final String[] permissionsToRequest = unrequestedPermissions.toArray(new String[0]); startShadowActivity(permissionsToRequest); } return Observable.concat(Observable.fromIterable(list)); }
/** * @param callable * @param <R> * @return */ @CheckReturnValue @NonNull public static <R> Single<R> single(@NonNull final Callable<Task<R>> callable) { return Single.fromCallable(callable).flatMap(new Function<Task<R>, SingleSource<? extends R>>() { @Override public SingleSource<? extends R> apply(Task<R> task) throws Exception { return single(task); } }); }
/** * @param task * @param <R> * @return */ @CheckReturnValue @NonNull public static <R> Single<R> single(@NonNull final Task<R> task) { return Single.create(new SingleOnSubscribe<R>() { @Override public void subscribe(@NonNull final SingleEmitter<R> emit) throws Exception { task.addOnCompleteListener(listener(emit)); } }); }
/** * @param callable * @param <R> * @return */ @CheckReturnValue @NonNull public static <R> Completable completes(@NonNull final Callable<Task<R>> callable) { return Single.fromCallable(callable).flatMapCompletable( new Function<Task<R>, Completable>() { @Override public Completable apply(Task<R> task) throws Exception { return completes(task); } }); }
/** * @param task * @param <R> Usually <Void> * @return */ @CheckReturnValue @NonNull public static <R> Completable completes(@NonNull final Task<R> task) { return Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(@NonNull final CompletableEmitter emit) throws Exception { task.addOnCompleteListener(RxTask.<R>listener(emit)); } }); }
/** * @param callable * @param <R> * @return */ @CheckReturnValue @NonNull public static <R> Maybe<R> maybe(@NonNull final Callable<Task<R>> callable) { return Single.fromCallable(callable).flatMapMaybe( new Function<Task<R>, MaybeSource<? extends R>>() { @Override public MaybeSource<? extends R> apply(Task<R> task) throws Exception { return maybe(task); } }); }
/** * @param task * @param <R> * @return */ @CheckReturnValue @NonNull public static <R> Maybe<R> maybe(@NonNull final Task<R> task) { return Maybe.create(new MaybeOnSubscribe<R>() { @Override public void subscribe(@NonNull final MaybeEmitter<R> emit) throws Exception { task.addOnCompleteListener(listener(emit)); } }); }
/** * Auto close client * mqttConnectOptions.userName = it }$ * mqttConnectOptions.password = it.toCharArray() } * @param client * @param topic * @return */ @NonNull @CheckReturnValue public static Observable<MqttMessage> remessage(@NonNull final MqttAndroidClient client, @NonNull final String topic) { final Observable<MqttMessage> msgObs = Observable.create(new ObservableOnSubscribe<MqttMessage>() { public void subscribe( @NonNull final ObservableEmitter<MqttMessage> emitter) throws Exception { client.subscribe(topic, 0, new IMqttMessageListener() { @Override public void messageArrived( String topic2, @NonNull final MqttMessage message) throws Exception { if (!emitter.isDisposed()) { emitter.onNext(message); } } }); } }); if (client.isConnected()) { return msgObs; } else { return reconnect(client).flatMapObservable( new Function<IMqttToken, ObservableSource<MqttMessage>>() { @Override public ObservableSource<MqttMessage> apply(IMqttToken token) throws Exception { return msgObs; } }); } }
@NonNull @CheckReturnValue public static Maybe<IMqttToken> connect( @NonNull final MqttAndroidClient client, @NonNull final DisconnectedBufferOptions disconnectedBufferOptions) { return connect(client, new MqttConnectOptions(), disconnectedBufferOptions); }
@NonNull @CheckReturnValue public static DisconnectedBufferOptions defaultDisconnectedBufferOptions() { final DisconnectedBufferOptions bufferOptions = new DisconnectedBufferOptions(); bufferOptions.setBufferEnabled(true); bufferOptions.setBufferSize(100); bufferOptions.setPersistBuffer(false); bufferOptions.setDeleteOldestMessages(false); return bufferOptions; }
@Override @NonNull @CheckReturnValue public SingleSource<T> apply(@NonNull Single<DataSnapshot> upstream) { return upstream.flatMap(new Function<DataSnapshot, SingleSource<? extends T>>() { @Override public SingleSource<? extends T> apply(@NonNull DataSnapshot dataSnapshot) { if (dataSnapshot.exists()) { return Single.just(dataSnapshot.getValue(clazz)); } else { return Single.error(new NoSuchElementException()); } } }); }
@Override @NonNull @CheckReturnValue public SingleSource<T> apply(@NonNull Single<DataSnapshot> upstream) { return upstream.flatMap(new Function<DataSnapshot, SingleSource<? extends T>>() { @Override public SingleSource<? extends T> apply(@NonNull DataSnapshot dataSnapshot) { if (dataSnapshot.exists()) { return Single.just(dataSnapshot.getValue(typeIndicator)); } else { return Single.error(new NoSuchElementException()); } } }); }
/** * @param query * @return */ @NonNull @CheckReturnValue public static Observable<DataSnapshot> changes(@NonNull final Query query) { return Observable.create(new ObservableOnSubscribe<DataSnapshot>() { @Override public void subscribe( @NonNull final ObservableEmitter<DataSnapshot> emit) throws Exception { final ValueEventListener listener = new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { if (!emit.isDisposed()) { emit.onNext(dataSnapshot); } } @Override public void onCancelled(DatabaseError e) { if (!emit.isDisposed()) { emit.onError(e.toException()); } } }; emit.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { query.removeEventListener(listener); } }); query.addValueEventListener(listener); } }); }
/** * @param ref * @param function * @return */ @NonNull @CheckReturnValue public static Single<DataSnapshot> transaction( @NonNull final DatabaseReference ref, @NonNull final Function<MutableData, Transaction.Result> function) { return Single.create(new SingleOnSubscribe<DataSnapshot>() { @Override public void subscribe( @NonNull final SingleEmitter<DataSnapshot> emit) throws Exception { ref.runTransaction(transaction(emit, function)); } }); }
/** * @param ref * @param function * @param fireLocalEvents * @return */ @NonNull @CheckReturnValue public static Single<DataSnapshot> transaction( @NonNull final DatabaseReference ref, @NonNull final Function<MutableData, Transaction.Result> function, final boolean fireLocalEvents) { return Single.create(new SingleOnSubscribe<DataSnapshot>() { @Override public void subscribe( @NonNull final SingleEmitter<DataSnapshot> emit) throws Exception { ref.runTransaction(transaction(emit, function), fireLocalEvents); } }); }
/** * @param emitter * @param function * @return */ @NonNull @CheckReturnValue public static Transaction.Handler transaction( @NonNull final SingleEmitter<DataSnapshot> emitter, @NonNull final Function<MutableData, Transaction.Result> function) { return new Transaction.Handler() { @Override public Transaction.Result doTransaction(MutableData mutableData) { try { return function.apply(mutableData); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void onComplete(@Nullable DatabaseError databaseError, boolean committed, @NonNull DataSnapshot dataSnapshot) { if (!emitter.isDisposed()) { if (null == databaseError) { emitter.onSuccess(dataSnapshot); } else { emitter.onError(databaseError.toException()); } } } }; }
/** * @param ref * @param clazz * @param <T> * @return */ @NonNull @CheckReturnValue public static <T> Observable<DataValue<T>> dataChangesOf( @NonNull final DatabaseReference ref, @NonNull final Class<T> clazz) { return dataChanges(ref).compose(new TransformerOfClazz<>(clazz)); }
/** * @param query * @param clazz * @param <T> * @return */ @NonNull @CheckReturnValue public static <T> Observable<DataValue<T>> dataChangesOf( @NonNull final Query query, @NonNull final Class<T> clazz) { return dataChanges(query).compose(new TransformerOfClazz<>(clazz)); }
/** * @param ref * @param typeIndicator * @param <T> * @return */ @NonNull @CheckReturnValue public static <T> Observable<DataValue<T>> dataChangesOf( @NonNull DatabaseReference ref, @NonNull GenericTypeIndicator<T> typeIndicator) { return dataChanges(ref) .compose(new TransformerOfGenericTypeIndicator<T>(typeIndicator)); }
/** * @param query * @param typeIndicator * @param <T> * @return */ @NonNull @CheckReturnValue public static <T> Observable<DataValue<T>> dataChangesOf( @NonNull Query query, @NonNull GenericTypeIndicator<T> typeIndicator) { return dataChanges(query) .compose(new TransformerOfGenericTypeIndicator<T>(typeIndicator)); }
/** * @param ref * @param clazz * @param <T> * @return */ @NonNull @CheckReturnValue public static <T> Single<T> dataOf( @NonNull DatabaseReference ref, @NonNull Class<T> clazz) { return data(ref).compose(new SingleTransformerOfClazz<>(clazz)); }
/** * @param query * @param clazz * @param <T> * @return */ @NonNull @CheckReturnValue public static <T> Single<T> dataOf( @NonNull Query query, @NonNull Class<T> clazz) { return data(query).compose(new SingleTransformerOfClazz<T>(clazz)); }
/** * @param query * @param typeIndicator * @param <T> * @return */ @NonNull @CheckReturnValue public static <T> Single<T> dataOf( @NonNull Query query, @NonNull GenericTypeIndicator<T> typeIndicator) { return data(query).compose(new SingleTransformerOfGenericTypeIndicator<T>(typeIndicator)); }
/** * @param ref * @return */ @NonNull @CheckReturnValue public static Completable removeValue(@NonNull final DatabaseReference ref) { return RxTask.completes(new Callable<Task<Void>>() { @Override public Task<Void> call() throws Exception { return ref.removeValue(); } }); }
/** * @param ref * @param priority * @return */ @NonNull @CheckReturnValue public static Completable setPriority( @NonNull final DatabaseReference ref, @NonNull final Object priority) { return RxTask.completes(new Callable<Task<Void>>() { @Override public Task<Void> call() throws Exception { return ref.setPriority(priority); } }); }
/** * @param ref * @param value * @param <T> * @return */ @NonNull @CheckReturnValue public static <T> Completable setValue( @NonNull final DatabaseReference ref, @NonNull final T value) { return RxTask.completes(new Callable<Task<Void>>() { @Override public Task<Void> call() throws Exception { return ref.setValue(value); } }); }