@SuppressWarnings("checkstyle:IllegalCatch") private void verifyActorReady(ActorRef actorRef) { // Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite // in a state yet to receive messages or isn't actually created yet. This seems to happen with // actorSelection so, to alleviate it, we use an actorSelection and send an Identify message with // retries to ensure it's ready. Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS); Throwable lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { try { ActorSelection actorSelection = system.actorSelection(actorRef.path().toString()); Future<Object> future = Patterns.ask(actorSelection, new Identify(""), timeout); ActorIdentity reply = (ActorIdentity)Await.result(future, timeout.duration()); Assert.assertNotNull("Identify returned null", reply.getRef()); return; } catch (Exception | AssertionError e) { Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); lastError = e; } } throw new RuntimeException(lastError); }
@Test public void testIdentifying() { getSystem().actorOf(ConstantEcho.props("foo"), "foo"); final ActorSelection selection = getSystem().actorSelection("/user/foo"); selection.tell(new Identify("identifyFoo"), testActor()); final Object[] seq = receiveN(1); ActorIdentity identity = (ActorIdentity) seq[0]; assertEquals("identifyFoo", identity.correlationId()); identity.getRef().tell("baz", testActor()); expectMsgEquals("foo"); }
@Override public void onReceive(Object msg) throws Exception { if (msg instanceof ActorIdentity && isTarget((ActorIdentity) msg)) { this.owner.tell(msg, getSender()); this.found = true; } if (msg.equals(START)) { this.found = false; getSelf().tell(LOOKUP, getSelf()); } else if (!found && msg.equals(LOOKUP)) { ActorSelection selection = this.getContext().actorSelection(path); selection.tell(new Identify(path), this.getSelf()); getContext().system().scheduler().scheduleOnce( Duration.create(RETRY_PERIOD, TimeUnit.SECONDS), getSelf(), LOOKUP, getContext().dispatcher(), getSelf()); } }
private <C extends RpcGateway> CompletableFuture<C> connectInternal( final String address, final Class<C> clazz, Function<ActorRef, InvocationHandler> invocationHandlerFactory) { checkState(!stopped, "RpcService is stopped"); LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", address, clazz.getName()); final ActorSelection actorSel = actorSystem.actorSelection(address); final Future<Object> identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds()); final Future<C> resultFuture = identify.map(new Mapper<Object, C>(){ @Override public C checkedApply(Object obj) throws Exception { ActorIdentity actorIdentity = (ActorIdentity) obj; if (actorIdentity.getRef() == null) { throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'); } else { ActorRef actorRef = actorIdentity.getRef(); InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader(); @SuppressWarnings("unchecked") C proxy = (C) Proxy.newProxyInstance( classLoader, new Class<?>[]{clazz}, invocationHandler); return proxy; } } }, actorSystem.dispatcher()); return FutureUtils.toJava(resultFuture); }
/** * Tests that a failing job recovery won't cause other job recoveries to fail. */ @Test public void testFailingJobRecovery() throws Exception { final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS); Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow(); final Configuration flinkConfiguration = new Configuration(); UUID leaderSessionID = UUID.randomUUID(); ActorRef jobManager = null; JobID jobId1 = new JobID(); JobID jobId2 = new JobID(); // set HA mode to zookeeper so that we try to recover jobs flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); try { final SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class); SubmittedJobGraph submittedJobGraph = mock(SubmittedJobGraph.class); when(submittedJobGraph.getJobId()).thenReturn(jobId2); when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2)); // fail the first job recovery when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new Exception("Test exception")); // succeed the second job recovery when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph); final TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); final Collection<JobID> recoveredJobs = new ArrayList<>(2); BlobServer blobServer = mock(BlobServer.class); Props jobManagerProps = Props.create( TestingFailingHAJobManager.class, flinkConfiguration, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), mock(InstanceManager.class), mock(Scheduler.class), blobServer, new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]), ActorRef.noSender(), new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, myLeaderElectionService, submittedJobGraphStore, mock(CheckpointRecoveryFactory.class), jobRecoveryTimeout, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), recoveredJobs).withDispatcher(CallingThreadDispatcher.Id()); jobManager = system.actorOf(jobManagerProps); Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis()); Await.ready(started, deadline.timeLeft()); // make the job manager the leader --> this triggers the recovery of all jobs myLeaderElectionService.isLeader(leaderSessionID); // check that we have successfully recovered the second job assertThat(recoveredJobs, containsInAnyOrder(jobId2)); } finally { TestingUtils.stopActor(jobManager); } }
private void sendIdentifyRequest() { getContext().actorSelection(path).tell(new Identify(path), self()); getContext() .system() .scheduler() .scheduleOnce(Duration.create(3, SECONDS), self(), ReceiveTimeout.getInstance(), getContext().dispatcher(), self()); }