/** * Tests that the query actor will be stopped when the MetricRegistry is shut down. */ @Test public void testQueryActorShutdown() throws Exception { final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); registry.startQueryService(actorSystem, null); ActorRef queryServiceActor = registry.getQueryService(); registry.shutdown(); try { Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout); fail("The query actor should be terminated resulting in a ActorNotFound exception."); } catch (ActorNotFound e) { // we expect the query actor to be shut down } }
/** * Tests that the query actor will be stopped when the MetricRegistry is shut down. */ @Test public void testQueryActorShutdown() throws Exception { final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); registry.startQueryService(actorSystem, null); ActorRef queryServiceActor = registry.getQueryService(); registry.shutdown(); try { Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout); fail("The query actor should be terminated resulting in a ActorNotFound exception."); } catch (ActorNotFound e) { // we expect the query actor to be shut down } }
public Future<Object> queryStateFuture(final QueryState<K> queryState) { LOG.debug("Try to get ActorRef future for key {}.", queryState.getKey()); Future<ActorRef> actorRefFuture = getActorRefFuture(queryState.getKey()); @SuppressWarnings("unchecked") Future<Object> result = actorRefFuture.flatMap(new Mapper<ActorRef, Future<Object>>() { public Future<Object> apply(ActorRef actorRef) { LOG.debug("Ask response actor for state for key {}.", queryState.getKey()); return Patterns.ask(actorRef, queryState, new Timeout(askTimeout)); } }, executor).recoverWith(new Recover<Future<Object>>() { @Override public Future<Object> recover(final Throwable failure) throws Throwable { if (failure instanceof WrongKeyPartitionException || failure instanceof ActorNotFound) { // wait askTimeout because we communicated with the wrong actor. This usually // indicates that not all actors have registered at the registry. return Patterns.after( askTimeout, getContext().system().scheduler(), executor, new Callable<Future<Object>>() { @Override public Future<Object> call() throws Exception { refreshCache(); return Futures.failed(failure); } }); } else if (failure instanceof AskTimeoutException) { LOG.debug("Ask timed out.", failure); handleAskTimeout(); return Futures.failed(failure); } else { LOG.debug("State query failed with.", failure); refreshCache(); return Futures.failed(failure); } } }, executor); return result; }