@Test public void clientCanCancelServerStreamExplicitly() throws InterruptedException { RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel); TestSubscriber<NumberProto.Number> subscription = stub .responsePressure(Single.just(Empty.getDefaultInstance())) .doOnNext(number -> System.out.println(number.getNumber(0))) .doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnComplete(() -> System.out.println("Completed")) .doOnCancel(() -> System.out.println("Client canceled")) .test(); Thread.sleep(250); subscription.dispose(); Thread.sleep(250); subscription.awaitTerminalEvent(3, TimeUnit.SECONDS); // Cancellation may or may not deliver the last generated message due to delays in the gRPC processing thread assertThat(Math.abs(subscription.valueCount() - svc.getLastNumberProduced())).isLessThanOrEqualTo(3); assertThat(svc.wasCanceled()).isTrue(); }
@Test public void clientCanCancelServerStreamImplicitly() throws InterruptedException { RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel); TestSubscriber<NumberProto.Number> subscription = stub .responsePressure(Single.just(Empty.getDefaultInstance())) .doOnNext(number -> System.out.println(number.getNumber(0))) .doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnComplete(() -> System.out.println("Completed")) .doOnCancel(() -> System.out.println("Client canceled")) .take(10) .test(); // Consume some work Thread.sleep(TimeUnit.SECONDS.toMillis(1)); subscription.dispose(); subscription.awaitTerminalEvent(3, TimeUnit.SECONDS); subscription.assertValueCount(10); subscription.assertTerminated(); assertThat(svc.wasCanceled()).isTrue(); }
@Test public void serverToClientBackpressure() throws InterruptedException { RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel); Single<Empty> rxRequest = Single.just(Empty.getDefaultInstance()); TestSubscriber<NumberProto.Number> rxResponse = stub.responsePressure(rxRequest) .doOnNext(n -> System.out.println(n.getNumber(0) + " <--")) .doOnNext(n -> waitIfValuesAreEqual(n.getNumber(0), 3)) .test(); rxResponse.awaitTerminalEvent(5, TimeUnit.SECONDS); rxResponse.assertComplete() .assertValueCount(NUMBER_OF_STREAM_ELEMENTS); assertThat(numberOfWaits.get()).isEqualTo(1); }
@Test public void clientCanCancelServerStreamExplicitly() throws InterruptedException { AtomicInteger lastNumberConsumed = new AtomicInteger(Integer.MAX_VALUE); ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel); Flux<NumberProto.Number> test = stub .responsePressure(Mono.just(Empty.getDefaultInstance())) .doOnNext(number -> {lastNumberConsumed.set(number.getNumber(0)); System.out.println("C: " + number.getNumber(0));}) .doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnComplete(() -> System.out.println("Completed")) .doOnCancel(() -> System.out.println("Client canceled")); Disposable subscription = test.publish().connect(); Thread.sleep(1000); subscription.dispose(); Thread.sleep(1000); // Cancellation may or may not deliver the last generated message due to delays in the gRPC processing thread assertThat(Math.abs(lastNumberConsumed.get() - svc.getLastNumberProduced())).isLessThanOrEqualTo(3); assertThat(svc.wasCanceled()).isTrue(); }
@Test public void clientCanCancelServerStreamImplicitly() throws InterruptedException { ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel); Flux<NumberProto.Number> test = stub .responsePressure(Mono.just(Empty.getDefaultInstance())) .doOnNext(number -> System.out.println(number.getNumber(0))) .doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnComplete(() -> System.out.println("Completed")) .doOnCancel(() -> System.out.println("Client canceled")) .take(10); Disposable subscription = test.publish().connect(); Thread.sleep(1000); assertThat(svc.wasCanceled()).isTrue(); }
@Test public void serverToClientBackpressure() throws InterruptedException { ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel); Mono<Empty> reactorRequest = Mono.just(Empty.getDefaultInstance()); Flux<NumberProto.Number> reactorResponse = stub.responsePressure(reactorRequest) .doOnNext(n -> System.out.println(n.getNumber(0) + " <--")) .doOnNext(n -> waitIfValuesAreEqual(n.getNumber(0), 3)); StepVerifier.create(reactorResponse) .expectNextCount(NUMBER_OF_STREAM_ELEMENTS) .expectComplete() .verify(Duration.ofSeconds(5)); assertThat(numberOfWaits.get()).isEqualTo(1); }
@Override public void flush() { List<Span> spans = new ArrayList<>(); queue.drain(spans::add); if (spans.isEmpty()) { return; } PatchTracesRequest request = PatchTracesRequest.newBuilder() .setProjectId(projectId) .setTraces(Traces.newBuilder().addAllTraces(translator.translateSpans(spans))) .build(); ApiFutures.addCallback( traceServiceClient.patchTracesCallable().futureCall(request), new ApiFutureCallback<Empty>() { @Override public void onFailure(Throwable t) { logger.warn("Error reporting traces.", t); } @Override public void onSuccess(Empty result) { logger.info("Successfully reported traces."); } }); }
public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); String abstractName = "mesh://timeService"; // Open a channel to the server Channel channel = ManagedChannelBuilder .forTarget(abstractName) .nameResolverFactory(StaticResolver.factory(new InetSocketAddress(host, port))) .usePlaintext(true) .build(); // Create a CompletableFuture-based stub TimeServiceGrpc8.TimeServiceCompletableFutureStub stub = TimeServiceGrpc8.newCompletableFutureStub(channel); // Call the service CompletableFuture<TimeReply> completableFuture = stub.getTime(Empty.getDefaultInstance()); TimeReply timeReply = completableFuture.get(); // Convert to JDK8 types Instant now = MoreTimestamps.toInstantUtc(timeReply.getTime()); System.out.println("The time is " + now); }
@Test public void getAllAuthoritiesRejected() throws Exception { Authentication authentication = new UsernamePasswordAuthenticationToken( DEFAULT_EMAIL, DEFAULT_PASSWORD, Collections.singletonList(new SimpleGrantedAuthority(AuthoritiesConstants.USER)) ); SecurityContextHolder.getContext().setAuthentication(authentication); try { List<String> roles = new ArrayList<>(); stub.getAllAuthorities(Empty.getDefaultInstance()).forEachRemaining(role -> roles.add(role.getValue())); failBecauseExceptionWasNotThrown(StatusRuntimeException.class); } catch (StatusRuntimeException e){ assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.PERMISSION_DENIED); } }
@Override public Flux<Metric> getMetrics(Mono<Empty> request) { return request .flatMapIterable(empty -> publicMetrics) .flatMapIterable(PublicMetrics::metrics) .map(metric -> { Metric.Builder builder = Metric.newBuilder() .setName(metric.getName()); if (metric.getTimestamp() != null) { builder.setTimestamp(ProtobufMappers.dateToTimestamp(metric.getTimestamp())); } if (metric.getValue() instanceof Long || metric.getValue() instanceof Integer) { builder.setLongValue(metric.getValue().longValue()); } else if (metric.getValue() instanceof Float || metric.getValue() instanceof Double) { builder.setDoubleValue((metric.getValue()).doubleValue()); } else { builder.setStringValue(metric.getValue().toString()); } return builder.build(); }); }
/** * Tests that when ackMessages() succeeds and the subsequent call to poll() has no messages, that * the subscriber does not invoke ackMessages because there should be no acks. */ @Test public void testPollInRegularCase() throws Exception { task.start(props); ReceivedMessage rm1 = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>()); PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm1).build(); when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse); List<SourceRecord> result = task.poll(); assertEquals(1, result.size()); stubbedPullResponse = PullResponse.newBuilder().build(); ListenableFuture<Empty> goodFuture = Futures.immediateFuture(Empty.getDefaultInstance()); when(subscriber.ackMessages(any(AcknowledgeRequest.class))).thenReturn(goodFuture); when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse); result = task.poll(); assertEquals(0, result.size()); result = task.poll(); assertEquals(0, result.size()); verify(subscriber, times(1)).ackMessages(any(AcknowledgeRequest.class)); }
private void rescheduleForTenant(final TenantId tenantId) { final TenantAwareFunction0<Iterator<Command>> func = new TenantAwareFunction0<Iterator<Command>>(tenantId) { @Override public Iterator<Command> apply() { return commandStore().iterator(SCHEDULED); } }; final Iterator<Command> commands = func.execute(Empty.getDefaultInstance()); final TenantAwareOperation op = new TenantAwareOperation(tenantId) { @Override public void run() { while (commands.hasNext()) { final Command command = commands.next(); reschedule(command); } } }; op.execute(); }
@Test public void callCompletionStatusesAreRecorded() throws InterruptedException { CallCompletionStatusInterceptor interceptor = new CallCompletionStatusInterceptor(); when(channelStub.newCall(BigtableServiceGrpc.CONFIG.mutateRow)).thenReturn(callStub); CompletionStatusGatheringCall<MutateRowRequest, Empty> wrappedCall = interceptor.interceptCall(BigtableServiceGrpc.CONFIG.mutateRow, channelStub); Listener<Empty> statusGatheringListener = wrappedCall.createGatheringListener(responseListenerStub); statusGatheringListener.onClose(Status.INTERNAL, new Metadata.Trailers()); CallCompletionStatusInterceptor.CallCompletionStatus expectedStatusEntry = new CallCompletionStatusInterceptor.CallCompletionStatus( BigtableServiceGrpc.CONFIG.mutateRow, Status.INTERNAL); Assert.assertEquals(1, interceptor.getCallCompletionStatuses().count(expectedStatusEntry)); }
@Test public void successfulCallsAreNotRetried() { RetryListener<MutateRowRequest, Empty> listener = new RetryListener<>( mockRetryingCall, request, new Headers.Headers(), true, // always retriable for testing mockResponseListener); listener.onHeaders(new Headers.Headers()); listener.onPayload(response); listener.onClose(Status.OK, new Trailers.Trailers()); // Validate that the listener did not attempt to start a new call on the channel: verifyNoMoreInteractions(mockRetryingCall); // Verify that the mockResponseListener was informed of the payload and closed: verify(mockResponseListener, times(1)).onPayload(eq(response)); verify(mockResponseListener, times(1)).onClose(eq(Status.OK), any(Trailers.Trailers.class)); }
@Test public void failuresAfterHeadersAreReceivedIsNotRetried() { Headers requestHeaders = new Headers.Headers(); RetryListener<MutateRowRequest, Empty> listener = new RetryListener<>( mockRetryingCall, request, requestHeaders, true, // always retriable for testing mockResponseListener); Headers responseHeaders = new Headers.Headers(); listener.onHeaders(responseHeaders); listener.onPayload(response); listener.onClose(Status.INTERNAL, new Trailers.Trailers()); // Validate that the listener did not attempt to start a new call on the channel: verifyNoMoreInteractions(mockRetryingCall); // Verify that the mockResponseListener was informed of the payload and closed: verify(mockResponseListener, times(1)).onHeaders(eq(responseHeaders)); verify(mockResponseListener, times(1)).onPayload(eq(response)); verify(mockResponseListener, times(1)).onClose( eq(Status.INTERNAL), any(Trailers.Trailers.class)); }
@Override public void deletes(Delete del, StreamObserver<Empty> response) { TableName tn = TableName.valueOf(del.getTable()); try (Table table = hbase.getTable(tn)) { org.apache.hadoop.hbase.client.Delete delete = new org.apache.hadoop.hbase.client.Delete(del.getId().toByteArray()); for (ColumnQualifier col : del.getColumnList()) { delete.addColumn(col.getCf().toByteArray(), col.getQualifier().toByteArray()); } LOG.info("deleting for id: {}.", del.getId().toStringUtf8()); table.delete(delete); response.onNext(Empty.getDefaultInstance()); response.onCompleted(); } catch (IOException ex) { LOG.error("error deleting from hbase.", ex); response.onError(ex); } }
@Override public void blockForMillis(BlockForMillisRequest request, StreamObserver<Empty> responseObserver) { logger.info("blocking for millis [" + request.getMillis() + "] request received at " + SimpleServiceImpl.DATE_FORMAT.format(new Date(System.currentTimeMillis()))); try { Thread.sleep(request.getMillis()); responseObserver.onNext(Empty.getDefaultInstance()); } catch (InterruptedException e) { responseObserver.onError(e); } responseObserver.onCompleted(); }
@Override public void noop(Empty request, StreamObserver<Empty> responseObserver) { logger.info("no-op request received at " + DATE_FORMAT.format( new Date(System.currentTimeMillis()))); responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); }
@Override public Flowable<NumberProto.Number> responsePressure(Single<Empty> request) { // Produce a very long sequence return Flowable .fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator) .delay(10, TimeUnit.MILLISECONDS) .doOnNext(i -> lastNumberProduced.set(i)) .map(CancellationPropagationIntegrationTest::protoNum) .doOnCancel(() -> { wasCanceled.set(true); System.out.println("Server canceled"); }); }
@Override public Flux<NumberProto.Number> responsePressure(Mono<Empty> request) { // Produce a very long sequence return Flux .fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator) .delayElements(Duration.ofMillis(SEQUENCE_DELAY_MILLIS)) .doOnNext(i -> lastNumberProduced.set(i)) .map(CancellationPropagationIntegrationTest::protoNum) .doOnCancel(() -> { wasCanceled.set(true); System.out.println("Server canceled"); }); }
@Override public synchronized void deleteShelf( DeleteShelfRequest request, StreamObserver<Empty> responseObserver) { if (shelfsById.remove(request.getId()) == null) { throw new RuntimeException(String.format("Shelf with id=%s not found", request.getId())); } }
@Override public synchronized void deleteBook( DeleteBookRequest request, StreamObserver<Empty> responseObserver) { if (booksById.remove(request.getId()) == null) { throw new RuntimeException(String.format("Book with id=%s not found", request.getId())); } }
@Override public void getTime(Empty request, StreamObserver<TimeReply> responseObserver) { // JDK8 type Instant now = Instant.now(); logger.info("Reporting the time " + now); // Protobuf type Timestamp protoNow = MoreTimestamps.fromInstantUtc(now); TimeReply reply = TimeReply.newBuilder().setTime(protoNow).build(); // Respond responseObserver.onNext(reply); responseObserver.onCompleted(); }
/** * Health status endpoint * @param request Empty request * @param responseObserver Response observer */ @Override public void getPasswordsServiceHealthStatus(Empty request, StreamObserver<PasswordsServiceHealthStatus> responseObserver) { responseObserver.onNext(PasswordsServiceHealthStatus.newBuilder() .setStatus(passwordReader.getDict().size() == 0 ? "Error" : "Running") .setTotalPasswordsLoaded(passwordReader.getDict().size()) .build()); responseObserver.onCompleted(); }
/** * A client to call the health status service * @return Map of the health status response */ public PassServiceHealthDto getHealthStatus() { ManagedChannel managedChannel = healthServiceManagedChannelSupplier.get(); PasswordsServiceHealthStatus status = PasswordsServiceHealthServiceGrpc .newBlockingStub(managedChannel) .withDeadlineAfter(grpcTimeout, TimeUnit.MILLISECONDS) .withInterceptors(interceptor) .getPasswordsServiceHealthStatus(Empty.getDefaultInstance()); return PassServiceHealthDto.builder() .status(status.getStatus()) .dictSize(status.getTotalPasswordsLoaded()) .build(); }
@Override public void cancelOperation( CancelOperationRequest request, StreamObserver<Empty> responseObserver) { Instance instance; try { instance = instances.getFromOperationName(request.getName()); } catch (InstanceNotFoundException ex) { responseObserver.onError(BuildFarmInstances.toStatusException(ex)); return; } instance.cancelOperation(request.getName()); responseObserver.onCompleted(); }
@Override public Mono<ProfileInfo> getActiveProfiles(Mono<Empty> request) { return request.map(e -> { ProfileInfo.Builder builder = ProfileInfo.newBuilder(); String[] activeProfiles = DefaultProfileUtil.getActiveProfiles(env); if (activeProfiles != null) { builder.addAllActiveProfiles(Arrays.asList(activeProfiles)); } String ribbonEnv = getRibbonEnv(activeProfiles); if (ribbonEnv != null) { builder.setRibbonEnv(ribbonEnv); } return builder.build(); }); }
@Override public Mono<Health> getHealth(Mono<Empty> request) { Map<String, HealthIndicator> healthIndicatorProtos = new HashMap<>(); this.healthIndicators.forEach((key, indicator) -> healthIndicatorProtos.put(key, healthIndicatorToHealthIndicatorProto(indicator))); return request.map( e -> Health.newBuilder() .setStatus(Status.valueOf(this.healthIndicator.health().getStatus().toString())) .putAllHealthIndicators(healthIndicatorProtos) .build() ); }
private static void assertGetLoggersReturnsCode(LoggersServiceGrpc.LoggersServiceBlockingStub stub, Status.Code code) { try { stub.getLoggers(Empty.getDefaultInstance()).forEachRemaining(l -> {}); failBecauseExceptionWasNotThrown(StatusRuntimeException.class); } catch (StatusRuntimeException e) { assertThat(e.getStatus().getCode()).isEqualTo(code); } }
@Test public void getConfigurationProperties() throws IOException { ConfigurationPropertiesReport report = stub.getConfigurationProperties(Empty.newBuilder().build()); String configurationPropertiesReportEndpointStr = report.getConfigurationPropertiesMap().get("configurationPropertiesReportEndpoint").getProperties(); ObjectMapper mapper = new ObjectMapper(); ConfigurationPropertiesReportEndpoint configurationPropertiesReportEndpoint = mapper.readValue(configurationPropertiesReportEndpointStr, ConfigurationPropertiesReportEndpoint.class); assertThat(configurationPropertiesReportEndpoint.getId()).isEqualTo(this.configurationPropertiesReportEndpoint.getId()); }
@Test public void testEnvironment() throws IOException { Environment Environment = stub.getEnv(Empty.newBuilder().build()); ObjectMapper mapper = new ObjectMapper(); TypeReference<HashMap<String,Object>> typeRef = new TypeReference<HashMap<String,Object>>() {}; // String value should represent a Json map HashMap<String,Object> env = mapper.readValue(Environment.getValue(), typeRef); assertThat(env).isNotEmpty(); }
@Override public Mono<Environment> getEnv(Mono<Empty> request) { return request.map( empty -> { ObjectMapper mapper = new ObjectMapper(); try { return Environment.newBuilder() .setValue(mapper.writeValueAsString(endpoint.invoke())) .build(); } catch (JsonProcessingException e) { throw Status.INTERNAL.withCause(e).asRuntimeException(); } }); }
@Override public Mono<Empty> deleteUser(Mono<StringValue> request) { return request .map(StringValue::getValue) .doOnSuccess(login -> log.debug("gRPC request to delete User: {}", login)) .doOnSuccess(userService::deleteUser) .map(l -> Empty.newBuilder().build()); }
@Override public Flux<StringValue> getAllAuthorities(Mono<Empty> request) { return request .doOnSuccess(e -> log.debug("gRPC request to gat all authorities")) .filter(e -> SecurityUtils.isCurrentUserInRole(AuthoritiesConstants.ADMIN)) .switchIfEmpty(Mono.error(Status.PERMISSION_DENIED.asRuntimeException())) .flatMapIterable(e -> userService.getAuthorities()) .map(authority -> StringValue.newBuilder().setValue(authority).build()); }
@Override public Mono<StringValue> isAuthenticated(Mono<Empty> request) { return request.map(e -> { log.debug("gRPC request to check if the current user is authenticated"); Authentication principal = SecurityContextHolder.getContext().getAuthentication(); StringValue.Builder builder = StringValue.newBuilder(); if (principal != null) { builder.setValue(principal.getName()); } return builder.build(); }); }
@Override public Mono<Empty> registerAccount(Mono<UserProto> request) { return request .doOnSuccess(userProto -> log.debug("gRPC request to register account {}", userProto.getLogin())) .filter(userProto -> checkPasswordLength(userProto.getPassword())) .switchIfEmpty(Mono.error(Status.INVALID_ARGUMENT.withDescription("Incorrect password").asRuntimeException())) .filter(userProto -> !userRepository.findOneByLogin(userProto.getLogin().toLowerCase()).isPresent()) .switchIfEmpty(Mono.error(Status.ALREADY_EXISTS.withDescription("Login already in use").asRuntimeException())) .filter(userProto -> !userRepository.findOneByEmailIgnoreCase(userProto.getEmail()).isPresent()) .switchIfEmpty(Mono.error(Status.ALREADY_EXISTS.withDescription("Email already in use").asRuntimeException())) .map(userProto -> Pair.of(userProtoMapper.userProtoToUserDTO(userProto), userProto.getPassword())) .map(pair -> { try { return userService.registerUser(pair.getFirst(), pair.getSecond()); <%_ if (databaseType === 'sql') { _%> } catch (TransactionSystemException e) { if (e.getOriginalException().getCause() instanceof ConstraintViolationException) { log.info("Invalid user", e); throw Status.INVALID_ARGUMENT.withDescription("Invalid user").asRuntimeException(); } else { throw e; } <%_ } _%> } catch (ConstraintViolationException e) { log.error("Invalid user", e); throw Status.INVALID_ARGUMENT.withDescription("Invalid user").asRuntimeException(); } }) .doOnSuccess(mailService::sendCreationEmail) .map(u -> Empty.newBuilder().build()); }
@Override public Mono<Empty> saveAccount(Mono<UserProto> request) { String currentLogin = SecurityUtils.getCurrentUserLogin().orElseThrow(Status.INTERNAL::asRuntimeException); return request .filter(user -> !userRepository.findOneByEmailIgnoreCase(user.getEmail()) .map(User::getLogin) .map(login -> !login.equalsIgnoreCase(currentLogin)) .isPresent() ) .switchIfEmpty(Mono.error(Status.ALREADY_EXISTS.withDescription("Email already in use").asRuntimeException())) .filter(user -> userRepository.findOneByLogin(currentLogin).isPresent()) .switchIfEmpty(Mono.error(Status.INTERNAL.asRuntimeException())) .doOnSuccess(user -> { try { userService.updateUser( user.getFirstName().isEmpty() ? null : user.getFirstName(), user.getLastName().isEmpty() ? null : user.getLastName(), user.getEmail().isEmpty() ? null : user.getEmail(), user.getLangKey().isEmpty() ? null : user.getLangKey()<% if (databaseType === 'mongodb' || databaseType === 'sql') { %>, user.getImageUrl().isEmpty() ? null : user.getImageUrl()<% } %> ); <%_ if (databaseType === 'sql') { _%> } catch (TransactionSystemException e) { if (e.getOriginalException().getCause() instanceof ConstraintViolationException) { log.info("Invalid user", e); throw Status.INVALID_ARGUMENT.withDescription("Invalid user").asRuntimeException(); } else { throw e; } <%_ } _%> } catch (ConstraintViolationException e) { log.error("Invalid user", e); throw Status.INVALID_ARGUMENT.withDescription("Invalid user").asRuntimeException(); } }) .map(u -> Empty.newBuilder().build()); }
@Override public Mono<Empty> changePassword(Mono<StringValue> request) { return request .map(StringValue::getValue) .filter(AccountService::checkPasswordLength) .switchIfEmpty(Mono.error(Status.INVALID_ARGUMENT.withDescription("Incorrect password").asRuntimeException())) .doOnSuccess(userService::changePassword) .map(p -> Empty.newBuilder().build()); }
@Override public Flux<PersistentToken> getCurrentSessions(Mono<Empty> request) { return request .map(e-> SecurityUtils.getCurrentUserLogin() .flatMap(userRepository::findOneByLogin) .orElseThrow(Status.INTERNAL::asRuntimeException) ) .flatMapIterable(persistentTokenRepository::findByUser) .map(ProtobufMappers::persistentTokenToPersistentTokenProto); }
@Override public Mono<Empty> invalidateSession(Mono<StringValue> request) { return request .map(StringValue::getValue) .doOnSuccess(series -> SecurityUtils.getCurrentUserLogin() .flatMap(userRepository::findOneByLogin) .map(persistentTokenRepository::findByUser) .orElse(new ArrayList<>()) .stream() .filter(persistentToken -> StringUtils.equals(persistentToken.getSeries(), series)) .forEach(persistentTokenRepository::delete) ) .map(s -> Empty.newBuilder().build()); }