@Override public ListenableFuture<List<DeviceType>> findDevicesByQuery(DeviceTypeSearchQuery query) { ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery()); ListenableFuture<List<DeviceType>> devices = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<DeviceType>>) relations1 -> { EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection(); List<ListenableFuture<DeviceType>> futures = new ArrayList<>(); for (EntityRelation relation : relations1) { EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom(); if (entityId.getEntityType() == ThingType.DEVICE) { futures.add(findDeviceByIdAsync(new DeviceTypeId(entityId.getId()))); } } return Futures.successfulAsList(futures); }); devices = Futures.transform(devices, new Function<List<DeviceType>, List<DeviceType>>() { @Nullable @Override public List<DeviceType> apply(@Nullable List<DeviceType> deviceList) { return deviceList.stream().collect(Collectors.toList()); } }); return devices; }
@Override public ListenableFuture<Boolean> deleteEntityRelations(EntityId entity) { log.trace("Executing deleteEntityRelations [{}]", entity); validate(entity); List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>(); for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) { inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup)); } ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList); ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, new AsyncFunction<List<List<EntityRelation>>, List<Boolean>>() { @Override public ListenableFuture<List<Boolean>> apply(List<List<EntityRelation>> relations) throws Exception { List<ListenableFuture<Boolean>> results = new ArrayList<>(); for (List<EntityRelation> relationList : relations) { relationList.stream().forEach(relation -> results.add(relationDao.deleteRelation(relation))); } return Futures.allAsList(results); } }); ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction()); ListenableFuture<Boolean> outboundFuture = relationDao.deleteOutboundRelations(entity); return Futures.transform(Futures.allAsList(Arrays.asList(inboundFuture, outboundFuture)), getListToBooleanFunction()); }
@Override public ListenableFuture<List<EntityRelationInfo>> findInfoByFrom(EntityId from, RelationTypeGroup typeGroup) { log.trace("Executing findInfoByFrom [{}][{}]", from, typeGroup); validate(from); validateTypeGroup(typeGroup); ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByFrom(from, typeGroup); ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> { List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>(); relations1.stream().forEach(relation -> futures.add(fetchRelationInfoAsync(relation, relation2 -> relation2.getTo(), (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setToName(entityName))) ); return Futures.successfulAsList(futures); }); return relationsInfo; }
@Override public ListenableFuture<List<EntityRelationInfo>> findInfoByTo(EntityId to, RelationTypeGroup typeGroup) { log.trace("Executing findInfoByTo [{}][{}]", to, typeGroup); validate(to); validateTypeGroup(typeGroup); ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByTo(to, typeGroup); ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> { List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>(); relations1.stream().forEach(relation -> futures.add(fetchRelationInfoAsync(relation, relation2 -> relation2.getFrom(), (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setFromName(entityName))) ); return Futures.successfulAsList(futures); }); return relationsInfo; }
@Override public ListenableFuture<List<EntityRelationInfo>> findInfoByQuery(EntityRelationsQuery query) { log.trace("Executing findInfoByQuery [{}]", query); ListenableFuture<List<EntityRelation>> relations = findByQuery(query); EntitySearchDirection direction = query.getParameters().getDirection(); ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> { List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>(); relations1.stream().forEach(relation -> futures.add(fetchRelationInfoAsync(relation, relation2 -> direction == EntitySearchDirection.FROM ? relation2.getTo() : relation2.getFrom(), (EntityRelationInfo relationInfo, String entityName) -> { if (direction == EntitySearchDirection.FROM) { relationInfo.setToName(entityName); } else { relationInfo.setFromName(entityName); } })) ); return Futures.successfulAsList(futures); }); return relationsInfo; }
@Override public ListenableFuture<List<Device>> findDevicesByQuery(DeviceSearchQuery query) { ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery()); ListenableFuture<List<Device>> devices = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Device>>) relations1 -> { EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection(); List<ListenableFuture<Device>> futures = new ArrayList<>(); for (EntityRelation relation : relations1) { EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom(); if (entityId.getEntityType() == ThingType.DEVICE) { futures.add(findDeviceByIdAsync(new DeviceId(entityId.getId()))); } } return Futures.successfulAsList(futures); }); devices = Futures.transform(devices, new Function<List<Device>, List<Device>>() { @Nullable @Override public List<Device> apply(@Nullable List<Device> deviceList) { return deviceList.stream().filter(device -> query.getDeviceTypes().contains(device.getType())).collect(Collectors.toList()); } }); return devices; }
@Override public ListenableFuture<AlarmInfo> findAlarmInfoByIdAsync(AlarmId alarmId) { log.trace("Executing findAlarmInfoByIdAsync [{}]", alarmId); validateId(alarmId, "Incorrect alarmId " + alarmId); return Futures.transform(alarmDao.findAlarmByIdAsync(alarmId.getId()), (AsyncFunction<Alarm, AlarmInfo>) alarm1 -> { AlarmInfo alarmInfo = new AlarmInfo(alarm1); return Futures.transform( entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>) originatorName -> { alarmInfo.setOriginatorName(originatorName); return alarmInfo; } ); }); }
@Override public ListenableFuture<List<AlarmInfo>> findAlarms(AlarmQuery query) { log.trace("Try to find alarms by entity [{}], searchStatus [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getSearchStatus(), query.getStatus(), query.getPageLink()); EntityId affectedEntity = query.getAffectedEntityId(); String searchStatusName; if (query.getSearchStatus() == null && query.getStatus() == null) { searchStatusName = AlarmSearchStatus.ANY.name(); } else if (query.getSearchStatus() != null) { searchStatusName = query.getSearchStatus().name(); } else { searchStatusName = query.getStatus().name(); } String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName; ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, ThingType.ALARM, query.getPageLink()); return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> { List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { alarmFutures.add(Futures.transform( findAlarmByIdAsync(relation.getTo().getId()), (Function<Alarm, AlarmInfo>) AlarmInfo::new)); } return Futures.successfulAsList(alarmFutures); }); }
private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { return partitions -> { try { PreparedStatement proto = getFetchStmt(aggregation); List<ResultSetFuture> futures = new ArrayList<>(partitions.size()); for (Long partition : partitions) { log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId()); BoundStatement stmt = proto.bind(); stmt.setString(0, entityId.getEntityType().name()); stmt.setUUID(1, entityId.getId()); stmt.setString(2, key); stmt.setLong(3, partition); stmt.setLong(4, startTs); stmt.setLong(5, endTs); log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId()); futures.add(executeAsyncRead(stmt)); } return Futures.allAsList(futures); } catch (Throwable e) { log.error("Failed to fetch data", e); throw e; } }; }
@Override public ListenableFuture<List<AlarmInfo>> findAlarms(AlarmQuery query) { log.trace("Try to find alarms by entity [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getStatus(), query.getPageLink()); EntityId affectedEntity = query.getAffectedEntityId(); String searchStatusName; if (query.getSearchStatus() == null && query.getStatus() == null) { searchStatusName = AlarmSearchStatus.ANY.name(); } else if (query.getSearchStatus() != null) { searchStatusName = query.getSearchStatus().name(); } else { searchStatusName = query.getStatus().name(); } String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName; ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, ThingType.ALARM, query.getPageLink()); return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> { List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { alarmFutures.add(Futures.transform(findAlarmByIdAsync(relation.getTo().getId()), (Function<Alarm, AlarmInfo>) AlarmInfo::new)); } return Futures.successfulAsList(alarmFutures); }); }
@Override public ListenableFuture<Boolean> deleteEntityRelationsAsync(EntityId entity) { log.trace("Executing deleteEntityRelationsAsync [{}]", entity); validate(entity); List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>(); for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) { inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup)); } ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList); ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, new AsyncFunction<List<List<EntityRelation>>, List<Boolean>>() { @Override public ListenableFuture<List<Boolean>> apply(List<List<EntityRelation>> relations) throws Exception { List<ListenableFuture<Boolean>> results = new ArrayList<>(); for (List<EntityRelation> relationList : relations) { relationList.stream().forEach(relation -> results.add(relationDao.deleteRelationAsync(relation))); } return Futures.allAsList(results); } }); ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction()); ListenableFuture<Boolean> outboundFuture = relationDao.deleteOutboundRelationsAsync(entity); return Futures.transform(Futures.allAsList(Arrays.asList(inboundFuture, outboundFuture)), getListToBooleanFunction()); }
@Override public ListenableFuture<List<Device>> findDevicesByQuery(DeviceSearchQuery query) { ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery()); ListenableFuture<List<Device>> devices = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Device>>) relations1 -> { EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection(); List<ListenableFuture<Device>> futures = new ArrayList<>(); for (EntityRelation relation : relations1) { EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom(); if (entityId.getEntityType() == EntityType.DEVICE) { futures.add(findDeviceByIdAsync(new DeviceId(entityId.getId()))); } } return Futures.successfulAsList(futures); }); devices = Futures.transform(devices, new Function<List<Device>, List<Device>>() { @Nullable @Override public List<Device> apply(@Nullable List<Device> deviceList) { return deviceList == null ? Collections.emptyList() : deviceList.stream().filter(device -> query.getDeviceTypes().contains(device.getType())).collect(Collectors.toList()); } }); return devices; }
@Override public ListenableFuture<List<AlarmInfo>> findAlarms(AlarmQuery query) { log.trace("Try to find alarms by entity [{}], searchStatus [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getSearchStatus(), query.getStatus(), query.getPageLink()); EntityId affectedEntity = query.getAffectedEntityId(); String searchStatusName; if (query.getSearchStatus() == null && query.getStatus() == null) { searchStatusName = AlarmSearchStatus.ANY.name(); } else if (query.getSearchStatus() != null) { searchStatusName = query.getSearchStatus().name(); } else { searchStatusName = query.getStatus().name(); } String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName; ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink()); return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> { List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { alarmFutures.add(Futures.transform( findAlarmByIdAsync(relation.getTo().getId()), (Function<Alarm, AlarmInfo>) AlarmInfo::new)); } return Futures.successfulAsList(alarmFutures); }); }
private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { return partitions -> { try { PreparedStatement proto = getFetchStmt(aggregation); List<ResultSetFuture> futures = new ArrayList<>(partitions.size()); for (Long partition : partitions) { log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId()); BoundStatement stmt = proto.bind(); stmt.setString(0, entityId.getEntityType().name()); stmt.setUUID(1, entityId.getId()); stmt.setString(2, key); stmt.setLong(3, partition); stmt.setLong(4, startTs); stmt.setLong(5, endTs); log.debug(GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID, stmt, entityId.getEntityType(), entityId.getId()); futures.add(executeAsyncRead(stmt)); } return Futures.allAsList(futures); } catch (Throwable e) { log.error("Failed to fetch data", e); throw e; } }; }
@Override public ListenableFuture<List<AlarmInfo>> findAlarms(AlarmQuery query) { log.trace("Try to find alarms by entity [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getStatus(), query.getPageLink()); EntityId affectedEntity = query.getAffectedEntityId(); String searchStatusName; if (query.getSearchStatus() == null && query.getStatus() == null) { searchStatusName = AlarmSearchStatus.ANY.name(); } else if (query.getSearchStatus() != null) { searchStatusName = query.getSearchStatus().name(); } else { searchStatusName = query.getStatus().name(); } String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName; ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink()); return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> { List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { alarmFutures.add(Futures.transform( findAlarmByIdAsync(relation.getTo().getId()), (Function<Alarm, AlarmInfo>) AlarmInfo::new)); } return Futures.successfulAsList(alarmFutures); }); }
@Override public ListenableFuture<List<Asset>> findAssetsByQuery(AssetSearchQuery query) { ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery()); ListenableFuture<List<Asset>> assets = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Asset>>) relations1 -> { EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection(); List<ListenableFuture<Asset>> futures = new ArrayList<>(); for (EntityRelation relation : relations1) { EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom(); if (entityId.getEntityType() == EntityType.ASSET) { futures.add(findAssetByIdAsync(new AssetId(entityId.getId()))); } } return Futures.successfulAsList(futures); }); assets = Futures.transform(assets, (Function<List<Asset>, List<Asset>>)assetList -> assetList == null ? Collections.emptyList() : assetList.stream().filter(asset -> query.getAssetTypes().contains(asset.getType())).collect(Collectors.toList()) ); return assets; }
private AsyncFunction<List<Object>, V> transformFunction(final T targetInstance) { return new AsyncFunction<List<Object>, V>() { // safe unchecked: type checks was done during introspection @SuppressWarnings("unchecked") @Override public ListenableFuture<V> apply(List<Object> input) throws Exception { @Nullable Object result = invoker.invoke(method, targetInstance, input.toArray()); if (isVoid(method)) { result = true; } if (result == null) { throw new NullPointerException( String.format("Method @%s %s should not return null", Eventually.Provides.class.getSimpleName(), source)); } if (result instanceof ListenableFuture<?>) { return (ListenableFuture<V>) result; } return Futures.immediateFuture((V) result); } }; }
@Override public Future<RpcResult<Void>> unregisterEndpoint(UnregisterEndpointInput input) { final RpcResult<Void> result = RpcResultBuilder.<Void>success().build(); if ( input == null) { return Futures.immediateFailedCheckedFuture(new IllegalArgumentException("Endpoint can not be empty!")); } final List<Uuid> toBeDeletedList = input.getIds(); if ( toBeDeletedList == null || toBeDeletedList.isEmpty()) { return Futures.immediateFuture(result); } ReadWriteTransaction trans = dataBroker.newReadWriteTransaction(); for (Uuid ep : toBeDeletedList) { InstanceIdentifier<Endpoint> eppath = Constants.DOM_ENDPOINTS_PATH .child(Endpoint.class, new EndpointKey(ep)); trans.delete(LogicalDatastoreType.OPERATIONAL, eppath); } CheckedFuture<Void,TransactionCommitFailedException> future = trans.submit(); return Futures.transformAsync(future, (AsyncFunction<Void, RpcResult<Void>>) input1 -> Futures.immediateFuture(result), executor); }
public ListenableFuture<byte[]> getContent() { Request request = getResolver().createRequest(); request.setVerb(HttpVerb.GET); OrcURL url = request.getUrl(); url.appendPathComponent("$value"); ListenableFuture<OrcResponse> future = oDataExecute(request); return Futures.transform(future, new AsyncFunction<OrcResponse, byte[]>() { @Override public ListenableFuture<byte[]> apply(OrcResponse response) throws Exception { SettableFuture<byte[]> result = SettableFuture.create(); result.set(response.getPayload()); return result; } }); }
public ListenableFuture<InputStream> getStreamedContent() { Request request = getResolver().createRequest(); request.setVerb(HttpVerb.GET); request.addOption(Request.MUST_STREAM_RESPONSE_CONTENT, "true"); OrcURL url = request.getUrl(); url.appendPathComponent("$value"); ListenableFuture<OrcResponse> future = oDataExecute(request); return Futures.transform(future, new AsyncFunction<OrcResponse, InputStream>() { @Override public ListenableFuture<InputStream> apply(OrcResponse response) throws Exception { SettableFuture<InputStream> result = SettableFuture.create(); result.set(new MediaEntityInputStream(response.openStreamedResponse(), response)); return result; } }); }
/** * Apply string listenable future. * * @param future the future * @return the listenable future */ public static <TEntity> ListenableFuture<TEntity> transformToEntityListenableFuture( ListenableFuture<String> future, final Class<TEntity> clazz, final DependencyResolver resolver) { return Futures.transform(future, new AsyncFunction<String, TEntity>() { @Override public ListenableFuture<TEntity> apply(String payload) throws Exception { final SettableFuture<TEntity> result = SettableFuture.create(); TEntity entity = null; try { resolver.getLogger().log("Entity Deserialization Started", LogLevel.VERBOSE); entity = resolver.getJsonSerializer().deserialize(payload, clazz); resolver.getLogger().log("Entity Deserialization Finished", LogLevel.VERBOSE); } catch (Throwable throwable) { result.setException(throwable); } result.set(entity); return result; } ; }); }
/** * Add list result callback. * * @param future the future */ public static <TEntity> ListenableFuture<List<TEntity>> transformToEntityListListenableFuture( ListenableFuture<String> future, final Class<TEntity> clazz, final DependencyResolver resolver) { return Futures.transform(future, new AsyncFunction<String, List<TEntity>>() { @Override public ListenableFuture<List<TEntity>> apply(String payload) throws Exception { SettableFuture<List<TEntity>> result = SettableFuture.create(); List<TEntity> list; try { resolver.getLogger().log("Entity collection Deserialization Started", LogLevel.VERBOSE); list = resolver.getJsonSerializer().deserializeList(payload, clazz); resolver.getLogger().log("Entity collection Deserialization Finished", LogLevel.VERBOSE); result.set(list); } catch (Throwable t) { result.setException(t); } return result; } }); }
public ListenableFuture<IData> chain(String pool, ListenableFuture<IData> future, String service, IData input, String ref, boolean merge, boolean interruptable) throws ThreadException { if (future == null) { return null; } ISThreadPoolExecutor ex = getExecutor(pool); if (ex instanceof VolatileISThreadPoolExecutor) { VolatileISThreadPoolExecutor vol = (VolatileISThreadPoolExecutor) ex; int priority = vol.getPriority(ref); AsyncFunction<IData, IData> callback = new ReactiveAsyncFunction(ex, service, input, priority, merge, Service.getSession().getSessionID(), interruptable); return Futures.transform(future, callback); } else { throw new IllegalStateException( "The creation of ServiceThread using a priority reference is only possible within volatile thread pool"); } }
/** * Transform the input futures into a single future, using the provided * transform function. The transformation follows the same semantics as as * {@link Futures#transformAsync(ListenableFuture, AsyncFunction)} and the input * futures are combined using {@link Futures#allAsList}. * * @param a a ListenableFuture to combine * @param b a ListenableFuture to combine * @param c a ListenableFuture to combine * @param d a ListenableFuture to combine * @param e a ListenableFuture to combine * @param function the implementation of the transform * @return a ListenableFuture holding the result of function.apply() */ public static <Z, A, B, C, D, E> ListenableFuture<Z> asyncTransform5( ListenableFuture<A> a, ListenableFuture<B> b, ListenableFuture<C> c, ListenableFuture<D> d, ListenableFuture<E> e, final AsyncFunction5<Z, ? super A, ? super B, ? super C, ? super D, ? super E> function) { return transform(Arrays.asList(a, b, c, d, e), new AsyncFunction<List<Object>, Z>() { @Override public ListenableFuture<Z> apply(List<Object> results) throws Exception { return function.apply( (A) results.get(0), (B) results.get(1), (C) results.get(2), (D) results.get(3), (E) results.get(4)); } }); }
/** * Transform the input futures into a single future, using the provided * transform function. The transformation follows the same semantics as as * {@link Futures#transformAsync(ListenableFuture, AsyncFunction)} and the input * futures are combined using {@link Futures#allAsList}. * * @param a a ListenableFuture to combine * @param b a ListenableFuture to combine * @param c a ListenableFuture to combine * @param d a ListenableFuture to combine * @param e a ListenableFuture to combine * @param f a ListenableFuture to combine * @param function the implementation of the transform * @return a ListenableFuture holding the result of function.apply() */ public static <Z, A, B, C, D, E, F> ListenableFuture<Z> asyncTransform6( ListenableFuture<A> a, ListenableFuture<B> b, ListenableFuture<C> c, ListenableFuture<D> d, ListenableFuture<E> e, ListenableFuture<F> f, final AsyncFunction6<Z, ? super A, ? super B, ? super C, ? super D, ? super E, ? super F> function) { return transform(Arrays.asList(a, b, c, d, e, f), new AsyncFunction<List<Object>, Z>() { @Override public ListenableFuture<Z> apply(List<Object> results) throws Exception { return function.apply( (A) results.get(0), (B) results.get(1), (C) results.get(2), (D) results.get(3), (E) results.get(4), (F) results.get(5)); } }); }
@Before public void setUp() throws Exception { //noinspection unchecked graphBuilder = mock(GraphBuilder.class); when(graphBuilder.getFallback()) .thenReturn(Optional.<AsyncFunction<Throwable, String>>absent()); Map<Input<?>, Object> emptyMap = Collections.emptyMap(); traverseState = new TraverseState(emptyMap, MoreExecutors.sameThreadExecutor(), true); List<? extends NodeInfo> currentNodeParameters = ImmutableList.of(); currentNodeInfo = new FakeNodeInfo("the node", currentNodeParameters); List<ListenableFuture<?>> currentNodeValues = ImmutableList.of(); currentCall = new TraverseState.FutureCallInformation(currentNodeInfo, currentNodeValues); currentCallInfo = new CallInfo(currentNodeInfo, NO_PARAMS); fallback = new NodeExecutionFallback<String>(graphBuilder, currentCall, traverseState); }
@Test public void shouldApplyFallbackToAnyException() throws Exception { AsyncFunction<Throwable, String> function = new AsyncFunction<Throwable, String>() { @Override public ListenableFuture<String> apply(Throwable input) throws Exception { return immediateFuture("all is well, nothing to see here"); } }; when(graphBuilder.getFallback()).thenReturn(Optional.of(function)); Throwable expected = new GraphExecutionException(null, currentCallInfo, NO_CALLS); ListenableFuture<String> future = fallback.create(expected); assertThat(future.get(), equalTo("all is well, nothing to see here")); }
public ListenableFuture<Void> executeAllAsync() { Callable<ListenableFuture<Void>> c = new Callable<ListenableFuture<Void>>() { @Override public ListenableFuture<Void> call() throws Exception { return executeAll(); } }; ListenableFuture<ListenableFuture<Void>> f = executor.submit(c); AsyncFunction<ListenableFuture<Void>, Void> function = new AsyncFunction<ListenableFuture<Void>, Void>() { @Override public ListenableFuture<Void> apply(ListenableFuture<Void> input) throws Exception { return input; } }; return Futures.transformAsync(f, function, executor); }
@Test public void testReplaceFailed() throws ExecutionException, InterruptedException { final String ok = "OK"; Will<String> okWillFallback = Wills.<String>failedWill(new RuntimeException()).replaceFailed(new AsyncFunction<Throwable, String>() { @Override public ListenableFuture<String> apply(@Nullable Throwable input) throws Exception { return Futures.immediateFuture(ok); } }); SmartAssert.assertSoft(okWillFallback.obtain(), is(ok), "Failed Will is not with Fallback"); Will<String> okWill = Wills.<String>failedWill(new RuntimeException()).replaceFailed(Wills.of(ok)); SmartAssert.assertSoft(okWill.obtain(), is(ok), "Failed Will is not replaced with Will"); Will<String> okWillListenableFuture = Wills.<String>failedWill(new RuntimeException()).replaceFailed(Futures.immediateFuture(ok)); SmartAssert.assertSoft(okWillListenableFuture.obtain(), is(ok), "Failed Will is not replaced with ListenableFuture"); }
public FunFuture<T> completeOrRecoverWith( ListenableFuture<T> future, AsyncFunction<? super Exception, ? extends T> exceptionHandler ) { if (attachFutureCompletion(future)) { Futures.addCallback(future, new FutureCallback<T>() { @Override public void onSuccess(@Nullable T result) { setSuccess(result); } @Override public void onFailure(Throwable t) { Exception cause = unwrapExecutionException(t); completeWithResultOf(() -> exceptionHandler.apply(cause)); } }); } return this; }
public static ListenableFuture<List<AbsoluteZNodePath>> deleteChildren( final ZNodePath parent, final ClientExecutor<? super Records.Request, ? extends Operation.ProtocolResponse<?>, ?> client) { return Futures.transform( GetChildren.create( parent, client, SettableFuturePromise.<List<AbsoluteZNodePath>>create()), new AsyncFunction<List<AbsoluteZNodePath>, List<AbsoluteZNodePath>>() { @Override public ListenableFuture<List<AbsoluteZNodePath>> apply( List<AbsoluteZNodePath> children) throws Exception { ImmutableList.Builder<ListenableFuture<AbsoluteZNodePath>> deletes = ImmutableList.builder(); for (AbsoluteZNodePath child: children) { deletes.add(DeleteSubtree.deleteAll(child, client)); } return Futures.allAsList(deletes.build()); } }); }
private AsyncFunction<Resolved<Topic>, ContextualQueryResult<Topic, Content>> resolveContentToContextualQuery(ContextualQuery<Topic, Content> query) { return resolved -> { com.google.common.base.Optional<Topic> possibleTopic = resolved.getResources().first(); if (!possibleTopic.isPresent()) { throw new NotFoundException(query.getContextQuery().getOnlyId()); } final Topic topic = possibleTopic.get(); final QueryContext context = query.getContext(); if (!context.getApplication().getConfiguration().isReadEnabled(topic.getSource())) { throw new ForbiddenException(topic.getId()); } return Futures.transform( resolveContent(queryIndex(query.getResourceQuery()), query.getContext()), toContextualQuery(topic, context) ); }; }