@Override public <T> Single<T> sendRequest(SqsAction<T> action) { Request asyncRequest = action.toHttpRequest(credentialsProvider.getCredentials()); SingleSubject<T> responseSubject = SingleSubject.create(); httpClient.executeRequest(asyncRequest, new AsyncCompletionHandler<Response>() { @Override public Response onCompleted(Response httpResponse) { Single.fromCallable(() -> action.parseHttpResponse(httpResponse)) .subscribeWith(responseSubject); return httpResponse; } @Override public void onThrowable(Throwable throwable) { responseSubject.onError(throwable); } }); return responseSubject; }
@Test public void testShutdownWithPendingPermits() { SingleSubject<List<SqsMessage<String>>> singleSubject = SingleSubject.create(); when(sqsQueueMock.deleteMessage(any(String.class))).thenReturn(Completable.complete()); when(sqsQueueMock.receiveMessages(anyInt(), any(Optional.class))).thenReturn(singleSubject); consumer.setMessageBuffer(messageBufferSmall); consumer.processNextMessage(consumer.getNextMessage()); //handler does not ack here, so permits will be pending forever Completable shutdownCompletable = consumer.shutdownAsync(); singleSubject.onSuccess(Collections.emptyList()); shutdownCompletable.test().assertNotComplete(); }
@Test public void testHandlerDeleteAndShutdown() { SingleSubject<List<SqsMessage<String>>> singleSubject = SingleSubject.create(); when(sqsQueueMock.deleteMessage(any(String.class))).thenReturn(Completable.complete()); when(sqsQueueMock.receiveMessages(anyInt(), any(Optional.class))).thenReturn(singleSubject); doAnswer((invocation -> { ((MessageAcknowledger) invocation.getArgument(1)).delete(); return null; })).when(consumerHandlerMock).handleMessage(any(), any()); consumer.setMessageBuffer(messageBufferSmall); consumer.processNextMessage(consumer.getNextMessage()); Completable shutdownCompletable = consumer.shutdownAsync(); singleSubject.onSuccess(Collections.emptyList()); shutdownCompletable.test().assertComplete(); verify(sqsQueueMock).deleteMessage(any(String.class)); }
@Test public void testHandlerIgnoreAndShutdown() { SingleSubject<List<SqsMessage<String>>> singleSubject = SingleSubject.create(); when(sqsQueueMock.deleteMessage(any(String.class))).thenReturn(Completable.complete()); when(sqsQueueMock.receiveMessages(anyInt(), any(Optional.class))).thenReturn(singleSubject); doAnswer((invocation -> { ((MessageAcknowledger) invocation.getArgument(1)).ignore(); return null; })).when(consumerHandlerMock).handleMessage(any(), any()); consumer.setMessageBuffer(messageBufferSmall); consumer.processNextMessage(consumer.getNextMessage()); Completable shutdownCompletable = consumer.shutdownAsync(); singleSubject.onSuccess(Collections.emptyList()); shutdownCompletable.test().assertComplete(); verify(sqsQueueMock, never()).deleteMessage(any(String.class)); }
@Test public void test() { SingleSubject<String> subject = SingleSubject.create(); Single<String> singleSource = subject.hide(); TestObserver testObserver = new TestObserver(); CompositeDisposable composite = new CompositeDisposable(); Disposable disposable = singleSource .compose(DisposableAttach.<String>to(composite)) .subscribeWith(testObserver); subject.onSuccess("Foo"); testObserver.assertValue("Foo"); assertTrue(composite.size() == 1); composite.dispose(); assertTrue(composite.size() == 0); assertTrue(composite.isDisposed()); assertTrue(disposable.isDisposed()); assertTrue(testObserver.isDisposed()); }
/** * Returns a Single that emits the value of the CompletionStage, its error or * NoSuchElementException if it signals null. * @param <T> the value type * @param future the source CompletionStage instance * @return the new Completable instance */ public static <T> Single<T> fromFuture(CompletionStage<T> future) { SingleSubject<T> cs = SingleSubject.create(); future.whenComplete((v, e) -> { if (e != null) { cs.onError(e); } else if (v != null) { cs.onSuccess(v); } else { cs.onError(new NoSuchElementException()); } }); return cs; }
@Test public void autoDispose_withMaybe_normal() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Got the event source.onSuccess(1); assertThat(o.takeSuccess()).isEqualTo(1); // Nothing more, lifecycle disposed too o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Lifecycle ends lifecycle.onSuccess(2); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // Event if upstream emits, no one is listening source.onSuccess(2); o.assertNoMoreEvents(); }
@Test public void autoDispose_withProvider() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); source.onSuccess(3); o.takeSuccess(); // All cleaned up o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); }
@Test public void autoDispose_withProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); // Lifecycle ends scope.onSuccess(3); assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); // No one is listening even if upstream finally does emit source.onSuccess(3); o.assertNoMoreEvents(); }
@Test public void autoDispose_withLifecycleProvider() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onSuccess(3); o.takeSuccess(); // All cleaned up o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withLifecycleProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Lifecycle ends lifecycle.onNext(3); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // No one is listening even if upstream finally does emit source.onSuccess(3); o.assertNoMoreEvents(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); SingleSubject<Integer> source = SingleSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); SingleSubject<Integer> source = SingleSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); SingleSubject<Integer> source = SingleSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
public MessageAcknowledger(SqsQueue<T> sqsQueue, String receiptId, Instant expirationTime) { this.expirationTime = expirationTime; this.sqsQueue = sqsQueue; this.receiptId = receiptId; this.ackModeSingle = SingleSubject.create(); this.ackingComplete = CompletableSubject.create(); Duration duration = Duration.between(Instant.now(), expirationTime); Completable.timer(duration.toMillis(), TimeUnit.MILLISECONDS).subscribe(this::ignore); }
@Override public Single<String> publishMessage(U body, Optional<Duration> maybeDelay) { return Single.defer(() -> { T serializedBody = inverseMap.apply(body); return delegate.publishMessage(serializedBody, maybeDelay); }).subscribeWith(SingleSubject.create()); }
@Override public <T> Single<T> sendRequest(SqsAction<T> request) { return Single.defer(() -> delegate.sendRequest(request)) .retry((errCount, error) -> { if (errCount > retryCount || request.isBatchAction()) { return false; } if (error instanceof AmazonSQSException) { return ((AmazonSQSException) error).getErrorType() == AmazonServiceException.ErrorType.Service; } return true; }).subscribeWith(SingleSubject.create());//convert to Hot single }
@Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { TestObserver<Object> o = SingleSubject.create() .as(AutoDispose.autoDisposable(ScopeProvider.UNBOUND)) .test(); o.assertNoValues(); o.assertNoErrors(); rule.assertNoErrors(); }
@Test public void unbound_shouldStillPassValues() { SingleSubject<Integer> s = SingleSubject.create(); TestObserver<Integer> o = s .as(AutoDispose.<Integer>autoDisposable(ScopeProvider.UNBOUND)) .test(); s.onSuccess(1); o.assertValue(1); }
@Override public Single<TransportResponse> callUnary(TransportRequest req) { // TODO: This will establish a new connection for each request. We should pool Channels. // Maybe we should just use AsyncHttpClient for this. SingleSubject<TransportResponse> responseSubject = SingleSubject.create(); ChannelFuture channelFuture = bootstrap .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new HttpClientCodec()) .addLast(new HttpTransportEncoder<>(requestEncoderConfig)) .addLast(new HttpTransportDecoder<>(ErrorResponseDecoder.CONFIGURATION)) .addLast(new HttpTransportDecoder<>(TransportResponseDecoder.CONFIGURATION)) .addLast(Channels.channelReader(responseSubject, TransportResponse.class)) .addLast(new TransportErrorObserver(responseSubject)); } }) .connect(url.getHost(), url.getPort() == -1 ? url.getDefaultPort() : url.getPort()); Channel channel = channelFuture.channel(); channelFuture.addListener( future -> { if (future.isSuccess()) { channel.writeAndFlush(req); } else if (!future.isCancelled()) { responseSubject.onError(future.cause()); } }); return responseSubject; }
@Override public Single<Ack> callOneway(TransportRequest req) { SingleSubject<Ack> ackSubject = SingleSubject.create(); ChannelFuture channelFuture = bootstrap .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new HttpClientCodec()) .addLast(new WriteAck<>(ackSubject, LastHttpContent.class)) .addLast(new HttpTransportEncoder<>(requestEncoderConfig)); } }) .connect(url.getHost(), url.getPort() == -1 ? url.getDefaultPort() : url.getPort()); Channel channel = channelFuture.channel(); channelFuture.addListener( future -> { if (future.isSuccess()) { channel.writeAndFlush(req); } else if (!future.isCancelled()) { ackSubject.onError(future.cause()); } }); return ackSubject; }
@Override public Single<String> publishMessage(T body, Optional<Duration> maybeDelay) { return Single.defer(() -> delegate.publishMessage(map.apply(body), maybeDelay)) .subscribeWith(SingleSubject.create());//makes it hot }
@Default public SingleSubject<String> getResultSubject() { return SingleSubject.create(); }
@Override public Single<String> publishMessage(T body, Optional<Duration> maybeDelay) { return Single.defer(() -> delegate.publishMessage(body, maybeDelay)) .retry(this::shouldRetry) .subscribeWith(SingleSubject.create());//convert to Hot single }
@Test public void race() throws Exception { Worker w = Schedulers.newThread().createWorker(); try { for (int i = 0; i < 1000; i++) { Integer[] value = { 0, 0 }; TestObserver<Integer> to = new TestObserver<Integer>() { @Override public void onSuccess(Integer v) { value[1] = value[0]; super.onSuccess(v); } }; SingleSubject<Integer> subj = SingleSubject.create(); subj.observeOn(Schedulers.single()) .onTerminateDetach() .subscribe(to); AtomicInteger wip = new AtomicInteger(2); CountDownLatch cdl = new CountDownLatch(2); w.schedule(() -> { if (wip.decrementAndGet() != 0) { while (wip.get() != 0); } subj.onSuccess(1); cdl.countDown(); }); Schedulers.single().scheduleDirect(() -> { if (wip.decrementAndGet() != 0) { while (wip.get() != 0); } to.cancel(); value[0] = null; cdl.countDown(); }); cdl.await(); Assert.assertNotNull(value[1]); } } finally { w.dispose(); } }