public static LongConsumer addLongTo(final List<Long> list) { return new LongConsumer() { @Override public void accept(long t) throws Exception { list.add(t); } }; }
public static LongConsumer printLong(final String prefix) { return new LongConsumer() { @Override public void accept(long t) throws Exception { System.out.println(prefix + t); } }; }
@Override public void shareImage(final int platform, final ShareImageObject shareImageObject, final Activity activity, final ShareListener listener) { Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception { try { emitter.onNext(ImageDecoder.decode(activity, shareImageObject)); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.DROP) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnRequest(new LongConsumer() { @Override public void accept(long aLong) { listener.shareRequest(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String localPath) { if (platform == SharePlatform.QZONE) { shareToQzoneForImage(localPath, activity, listener); } else { shareToQQForImage(localPath, activity, listener); } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { activity.finish(); listener.shareFailure(new Exception(throwable)); } }); }
public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, final long start, final int maxConcurrency) { return Flowable.defer(new Callable<Flowable<T>>() { @Override public Flowable<T> call() throws Exception { // need a ReplaySubject because multiple requests can come // through before concatEager has established subscriptions to // the subject final ReplaySubject<Flowable<T>> subject = ReplaySubject.create(); final AtomicLong position = new AtomicLong(start); LongConsumer request = new LongConsumer() { @Override public void accept(final long n) throws Exception { final long pos = position.getAndAdd(n); if (SubscriptionHelper.validate(n)) { Flowable<T> flowable; try { flowable = fetch.apply(pos, n); } catch (Throwable e) { Exceptions.throwIfFatal(e); subject.onError(e); return; } // reduce allocations by incorporating the onNext // and onComplete actions into the mutable count // object final Count count = new Count(subject, n); flowable = flowable // .doOnNext(count) // .doOnComplete(count); subject.onNext(flowable); } } }; return Flowable // .concatEager(subject.serialize() // .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) // .doOnRequest(request); } }); }
@Override public void shareMedia( final int platform, final String title, final String targetUrl, final String summary, final ShareImageObject shareImageObject, final Activity activity, final ShareListener listener) { Flowable.create(new FlowableOnSubscribe<byte[]>() { @Override public void subscribe(@NonNull FlowableEmitter<byte[]> emitter) throws Exception { try { String imagePath = ImageDecoder.decode(activity, shareImageObject); emitter.onNext(ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE)); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.DROP) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnRequest(new LongConsumer() { @Override public void accept(long aLong) { listener.shareRequest(); } }) .subscribe(new Consumer<byte[]>() { @Override public void accept(byte[] bytes) { WXWebpageObject webpageObject = new WXWebpageObject(); webpageObject.webpageUrl = targetUrl; WXMediaMessage message = new WXMediaMessage(webpageObject); message.title = title; message.description = summary; message.thumbData = bytes; sendMessage(platform, message, buildTransaction("webPage")); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { activity.finish(); listener.shareFailure(new Exception(throwable)); } }); }
@Override public void shareImage(final int platform, final ShareImageObject shareImageObject, final Activity activity, final ShareListener listener) { Flowable.create(new FlowableOnSubscribe<Pair<Bitmap, byte[]>>() { @Override public void subscribe(@NonNull FlowableEmitter<Pair<Bitmap, byte[]>> emitter) throws Exception { try { String imagePath = ImageDecoder.decode(activity, shareImageObject); emitter.onNext(Pair.create(BitmapFactory.decodeFile(imagePath), ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE))); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnRequest(new LongConsumer() { @Override public void accept(long aLong) { listener.shareRequest(); } }) .subscribe(new Consumer<Pair<Bitmap,byte[]>>() { @Override public void accept(Pair<Bitmap, byte[]> pair) { WXImageObject imageObject = new WXImageObject(pair.first); WXMediaMessage message = new WXMediaMessage(); message.mediaObject = imageObject; message.thumbData = pair.second; sendMessage(platform, message, buildTransaction("image")); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { activity.finish(); listener.shareFailure(new Exception(throwable)); } }); }
private void shareTextOrImage(final ShareImageObject shareImageObject, final String text, final Activity activity, final ShareListener listener) { Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() { @Override public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) throws Exception { try { String path = ImageDecoder.decode(activity, shareImageObject); emitter.onNext(Pair.create(path, ImageDecoder.compress2Byte(path, TARGET_SIZE, TARGET_LENGTH))); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.DROP) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnRequest(new LongConsumer() { @Override public void accept(long aLong) { listener.shareRequest(); } }) .subscribe(new Consumer<Pair<String,byte[]>>() { @Override public void accept(Pair<String, byte[]> pair) { ImageObject imageObject = new ImageObject(); imageObject.imageData = pair.second; imageObject.imagePath = pair.first; WeiboMultiMessage message = new WeiboMultiMessage(); message.imageObject = imageObject; if (!TextUtils.isEmpty(text)) { TextObject textObject = new TextObject(); textObject.text = text; message.textObject = textObject; } sendRequest(activity, message); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { activity.finish(); listener.shareFailure(new Exception(throwable)); } }); }
@Override public void shareMedia(final int platform, final String title, final String targetUrl, final String summary, final ShareImageObject shareImageObject, final Activity activity, final ShareListener listener) { Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception { try { emitter.onNext(ImageDecoder.decode(activity, shareImageObject)); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.DROP) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnRequest(new LongConsumer() { @Override public void accept(long aLong) { listener.shareRequest(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) { if (platform == SharePlatform.QZONE) { shareToQZoneForMedia(title, targetUrl, summary, s, activity, listener); } else { shareToQQForMedia(title, summary, targetUrl, s, activity, listener); } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { activity.finish(); listener.shareFailure(new Exception(throwable)); } }); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.PASS_THROUGH) @SchedulerSupport("none") public Flowable<T> doOnLifecycle(Consumer<? super Subscription> onSubscribe, LongConsumer onRequest, Action onCancel) { return boxed.doOnLifecycle(onSubscribe, onRequest, onCancel); }
@CheckReturnValue @BackpressureSupport(BackpressureKind.PASS_THROUGH) @SchedulerSupport("none") public Flowable<T> doOnRequest(LongConsumer onRequest) { return boxed.doOnRequest(onRequest); }