@Override public Receive createReceive() { return receiveBuilder(). match(CreateGuest.class, createGuest -> { final ActorRef guest = createGuest(createGuest.favoriteCoffee, createGuest.caffeineLimit); addGuestToBookkeeper(guest); context().watch(guest); }). match(ApproveCoffee.class, this::coffeeApproved, approveCoffee -> barista.forward(new Barista.PrepareCoffee(approveCoffee.coffee, approveCoffee.guest), context()) ). match(ApproveCoffee.class, approveCoffee -> { log().info("Sorry, {}, but you have reached your limit.", approveCoffee.guest.path().name()); context().stop(approveCoffee.guest); }). match(Terminated.class, terminated -> { log().info("Thanks, {}, for being our guest!", terminated.getActor()); removeGuestFromBookkeeper(terminated.getActor()); }).build(); }
public CoffeeHouse(int caffeineLimit) { log().debug("CoffeeHouse Open"); this.caffeineLimit = caffeineLimit; receive(ReceiveBuilder. match(CreateGuest.class, createGuest -> { final ActorRef guest = createGuest(createGuest.favoriteCoffee); // final ActorRef guest = createGuest(createGuest.favoriteCoffee, createGuest.caffeineLimit); addGuestToBookkeeper(guest); context().watch(guest); }). match(ApproveCoffee.class, this::coffeeApproved, approveCoffee -> barista.forward(new Barista.PrepareCoffee(approveCoffee.coffee, approveCoffee.guest), context()) ). match(ApproveCoffee.class, approveCoffee -> { log().info("Sorry, {}, but you have reached your limit.", approveCoffee.guest.path().name()); context().stop(approveCoffee.guest); }). match(Terminated.class, terminated -> { log().info("Thanks, {}, for being our guest!", terminated.getActor()); removeGuestFromBookkeeper(terminated.getActor()); }). matchAny(this::unhandled).build() ); }
@Override public Receive createReceive() { return receiveBuilder(). match(CreateGuest.class, createGuest -> { final ActorRef guest = createGuest(createGuest.favoriteCoffee); addGuestToBookkeeper(guest); context().watch(guest); }). match(ApproveCoffee.class, this::coffeeApproved, approveCoffee -> barista.forward(new Barista.PrepareCoffee(approveCoffee.coffee, approveCoffee.guest), context()) ). match(ApproveCoffee.class, approveCoffee -> { log().info("Sorry, {}, but you have reached your limit.", approveCoffee.guest.path().name()); context().stop(approveCoffee.guest); }). match(Terminated.class, terminated -> { log().info("Thanks, {}, for being our guest!", terminated.getActor()); removeGuestFromBookkeeper(terminated.getActor()); }).build(); }
@Test public void testShardTransactionInactivity() { datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout( 500, TimeUnit.MILLISECONDS).build(); new JavaTestKit(getSystem()) { { final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), "testShardTransactionInactivity"); watch(transaction); expectMsgClass(duration("3 seconds"), Terminated.class); } }; }
@Override public void onReceive(Object msg) throws Exception { if (msg instanceof SessionAwareMsg) { forwardToSessionActor((SessionAwareMsg) msg); } else if (msg instanceof SessionTerminationMsg) { onSessionTermination((SessionTerminationMsg) msg); } else if (msg instanceof Terminated) { onTermination((Terminated) msg); } else if (msg instanceof SessionTimeoutMsg) { onSessionTimeout((SessionTimeoutMsg) msg); } else if (msg instanceof SessionCtrlMsg) { onSessionCtrlMsg((SessionCtrlMsg) msg); } else if (msg instanceof ClusterEventMsg) { broadcast(msg); } }
@Override public void onReceive(Object msg) throws Exception { logger.debug("Received message: {}", msg); if (msg instanceof ToDeviceActorMsg) { processDeviceMsg((ToDeviceActorMsg) msg); } else if (msg instanceof ToAssetActorMsg) { processAssetMsg((ToAssetActorMsg) msg); } else if (msg instanceof ToPluginActorMsg) { onToPluginMsg((ToPluginActorMsg) msg); } else if (msg instanceof ToRuleActorMsg) { onToRuleMsg((ToRuleActorMsg) msg); } else if (msg instanceof ToDeviceActorNotificationMsg) { onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg); } else if (msg instanceof Terminated) { processTermination((Terminated) msg); } else if (msg instanceof ClusterEventMsg) { broadcast(msg); } else if (msg instanceof ComponentLifecycleMsg) { onComponentLifecycleMsg((ComponentLifecycleMsg) msg); } else if (msg instanceof PluginTerminationMsg) { onPluginTerminated((PluginTerminationMsg) msg); } else { logger.warning("Unknown message: {}!", msg); } }
private void reLaunchActor(Terminated message) { //TODO: test this ActorRef bundleActor = this.getContext().actorOf( springExtension.props(ApplicationConfiguration.JETTY_SERVLET_BUNDLE_APP_SUPERVISOR_ACTOR)); ApplicationOptions config = ApplicationConfiguration.CONFIGURATIONS.get(((Terminated) message).getActor()); getContext().unwatch(((Terminated) message).getActor()); if (config != null) { BundleMessageRequest request = new BundleMessageRequest(); request.setBundleRequestType(BundleRequestType.RE_HYDRATE); request.setApplicationOptions(config); bundleActor.tell(request, this.getSelf()); getContext().watch(bundleActor); } if (LOG.isDebugEnabled()) LOG.debug("Restarted actor {}", ((Terminated) message).actor()); }
@Override public Receive createReceive() { return receiveBuilder() .matchEquals("instrument", message -> periodicInstrumentation()) .match(PeriodicData.class, this::executePeriodicData) .match(Connect.class, this::executeConnect) .match(LogLine.class, this::executeLogLine) .match(MetricsListRequest.class, ignored -> executeMetricsListRequest()) .match(LogsListRequest.class, ignored -> executeLogsListRequest()) .match(LogFileAppeared.class, this::executeLogAdded) .match(LogFileDisappeared.class, this::executeLogRemoved) .match(Terminated.class, this::executeQuit) .matchAny(message -> { _metrics.incrementCounter(UNKNOWN_COUNTER); LOGGER.warn() .setMessage("Unsupported message") .addData("actor", self()) .addData("data", message) .log(); unhandled(message); }) .build(); }
private void shutdownAkka() { LOGGER.info().setMessage("Stopping akka").log(); if (_actorSystem != null) { final Future<Terminated> terminate = _actorSystem.terminate(); try { Await.result(terminate, Duration.create(30, TimeUnit.SECONDS)); // CHECKSTYLE.OFF: IllegalCatch - Await.result throws Exception } catch (final Exception e) { // CHECKSTYLE.ON: IllegalCatch LOGGER.warn() .setMessage("Exception while shutting down actor system") .setThrowable(e) .log(); } _actorSystem = null; } }
@Override public void onReceive(Object message) throws InterpreterException { if (message instanceof InterpreterInterface.SubmoduleOutPortHasSignal) { moduleOutPortHasSignal(((InterpreterInterface.SubmoduleOutPortHasSignal) message).getOutPortId()); } else if (message instanceof InstanceProvider) { setInstanceProvider((InstanceProvider) message); } else if (message instanceof RuntimeContext) { setRuntimeContext((RuntimeContext) message); } else if (message == TopLevelInterpreterActorInterface.Start.INSTANCE) { startRunning(); } else if (message instanceof Props) { start((Props) message); } else if (message instanceof Terminated) { childActorTerminated(((Terminated) message).actor()); } else if (message instanceof Status.Failure) { failure(((Status.Failure) message).cause()); } else { super.onReceive(message); } }
@Override public void onReceive(Object message) { if (message instanceof MasterInterpreterActorInterface.CreateExecution) { createExecution((MasterInterpreterActorInterface.CreateExecution) message); } else if (message instanceof MasterInterpreterActorInterface.StartExecution) { startExecution(((MasterInterpreterActorInterface.StartExecution) message).getExecutionId()); } else if (message instanceof MasterInterpreterActorInterface.CancelWorkflow) { MasterInterpreterActorInterface.CancelWorkflow arguments = (MasterInterpreterActorInterface.CancelWorkflow) message; cancel(arguments.getExecutionId(), arguments.getThrowable()); } else if (message instanceof Terminated) { terminated(((Terminated) message).getActor()); } else { unhandled(message); } }
@Override public void onReceive(Object message) throws InterpreterException { if (message instanceof IterationStatus) { IterationStatus arguments = (IterationStatus) message; iterationStatus(arguments.iteration, arguments.hasAllInputs); } else if (message instanceof FoundIterationInStagingArea) { foundIterationInStagingArea(((FoundIterationInStagingArea) message).iteration); } else if (message instanceof CopyInputsForIteration) { copyInputsForIteration(((CopyInputsForIteration) message).iteration); } else if (message instanceof CopiedInPortToFirstIteration) { copiedInPortToFirstIteration(((CopiedInPortToFirstIteration) message).inPortId); } else if (message instanceof Terminated) { childActorTerminated(((Terminated) message).getActor()); } else if (message instanceof FinishedLastIteration) { finished(((FinishedLastIteration) message).iteration); } else if (message == LocalMessages.PREPARE_TO_TERMINATE) { prepareToTerminate(); } else { super.onReceive(message); } }
@Override public void onReceive(Object message) { if (message instanceof Terminated) { try { Terminated term = (Terminated) message; String name = term.actor().path().toSerializationFormat(); if (log != null) { log.error("Actor " + name + " terminated, stopping process..."); // give the log some time to reach disk try { Thread.sleep(100); } catch (InterruptedException e) { // not really a problem if we don't sleep... } } } finally { System.exit(exitCode); } } }
@Test public void test_sup_strategy_stop() throws Throwable { javactor = new HandleRuntimeExceptionTestJavactor(); final Props props = Props.create(new MyCreator(javactor)); final ActorRef target = system.actorOf(props); new JavaTestKit(system) { { final JavaTestKit probe = new JavaTestKit(system); Duration timeout = Duration.create("5 second"); final ActorRef child = (ActorRef) Await.result( Patterns.ask(target, BoomActor.class, 5000), timeout); probe.watch(child); target.tell(SupervisorDirective.STOP, probe.getRef()); probe.expectMsgClass(Object.class); target.tell(new TestMsg(), null); final Terminated msg = probe.expectMsgClass(Terminated.class); assertEquals(msg.getActor(), child); } }; }
@Test public void test_sup_strategy_on_exception_method_param_supertype_of_thrown_exception() throws Throwable { javactor = new HandleThrowableTestjavactor(); final Props props = Props.create(new MyCreator(javactor)); final ActorRef target = system.actorOf(props); new JavaTestKit(system) { { final JavaTestKit probe = new JavaTestKit(system); Duration timeout = Duration.create("5 second"); final ActorRef child = (ActorRef) Await.result( Patterns.ask(target, BoomActor.class, 5000), timeout); probe.watch(child); target.tell(SupervisorDirective.STOP, probe.getRef()); probe.expectMsgClass(Object.class); target.tell(new TestMsg(), null); final Terminated msg = probe.expectMsgClass(Terminated.class); assertEquals(msg.getActor(), child); } }; }
@Override public void onReceive(Object message) throws Exception { LOG.debug("[{}] Received: {}", userId, message); if (message instanceof EndpointUserConnectMessage) { processEndpointConnectMessage((EndpointUserConnectMessage) message); } else if (message instanceof EndpointUserDisconnectMessage) { processEndpointDisconnectMessage((EndpointUserDisconnectMessage) message); } else if (message instanceof EndpointEventSendMessage) { processEndpointEventSendMessage((EndpointEventSendMessage) message); } else if (message instanceof RemoteEndpointEventMessage) { processRemoteEndpointEventMessage((RemoteEndpointEventMessage) message); } else if (message instanceof EndpointEventTimeoutMessage) { processEndpointEventTimeoutMessage((EndpointEventTimeoutMessage) message); } else if (message instanceof EndpointEventDeliveryMessage) { processEndpointEventDeliveryMessage((EndpointEventDeliveryMessage) message); } else if (message instanceof RouteInfoMessage) { processRouteInfoMessage((RouteInfoMessage) message); } else if (message instanceof UserRouteInfoMessage) { processUserRouteInfoMessage((UserRouteInfoMessage) message); } else if (message instanceof Terminated) { processTerminationMessage((Terminated) message); } else if (message instanceof ClusterUpdateMessage) { messageProcessor.processClusterUpdate(context()); } }
/** * Process termination. * * @param message the message */ private void processTermination(Terminated message) { ActorRef terminated = message.actor(); if (terminated instanceof LocalActorRef) { LocalActorRef localActor = (LocalActorRef) terminated; String name = localActor.path().name(); if (applications.remove(name) != null) { LOG.debug("[{}] removed application: {}", tenantId, localActor); } else if (localUsers.remove(name) != null) { LOG.debug("[{}] removed local user: {}", tenantId, localActor); } else if (globalUsers.remove(name) != null) { LOG.debug("[{}] removed global user: {}", tenantId, localActor); } } else { LOG.warn("remove commands for remote actors are not supported yet!"); } }
@Override public void onReceive(Object msg) throws Exception { if (msg instanceof Connect) { InetSocketAddress address = ((Connect) msg).getAddress(); log.debug("connecting to " + address); ActorRef tcp = Tcp.get(getContext().system()).manager(); tcp.tell(TcpMessage.connect(address), getSelf()); } else if (msg instanceof CommandFailed) { log.error(msg.toString()); app.tell(new ConnectFailed((CommandFailed) msg), getSelf()); } else if (msg instanceof Connected) { log.info("connected"); ActorRef listener = getContext().actorOf(ClientListener.props(config), nameGenerator.getName(ClientListener.class)); listener.tell(msg, getSender()); getContext().watch(listener); } else if(msg instanceof Terminated) { log.warning("connection closed"); app.tell(new ConnectionClosed(), getSelf()); } else { unhandled(msg); } }
@Override public void onReceive(Object msg) throws Exception { if(msg instanceof Connected) { log.debug("connected"); ActorRef actors = getContext().actorOf(ClientActors.props(config), "clientActors"); Config sslConfig = ConfigUtils.getOptionalConfig(config, "ssl"); ActorRef messageProtocolHandler = getContext().actorOf(MessageProtocolHandler.props(false, sslConfig, getSender(), actors), "messages"); getContext().watch(messageProtocolHandler); actors.tell(new ListenerInit(messageProtocolHandler), getSelf()); } else if(msg instanceof Terminated) { getContext().stop(getSelf()); } else { unhandled(msg); } }
@Override public final void onReceive(Object msg) throws Exception { if(msg instanceof Commit) { handleCommit(); } else if(msg instanceof Rollback) { handleRollback(); } else if(msg instanceof Query) { handleQuery((Query)msg); } else if(msg instanceof StreamingQuery) { handleStreamingQuery((StreamingQuery)msg); } else if(msg instanceof ReceiveTimeout) { handleTimeout(); } else if(msg instanceof Terminated) { handleTerminated((Terminated)msg); } else { unhandled(msg); } }
private void handleTerminated(Terminated msg) { ActorRef actor = msg.getActor(); if(cursors.remove(actor)) { log.debug("cursor terminated"); if(cursors.isEmpty()) { log.debug("no cursors left"); getContext().setReceiveTimeout(receiveTimeout); } else { log.debug("pending cursors"); } } else { log.error("unknown actor terminated: " + actor); } }
@Override public void onReceive(Object msg) throws Exception { if(msg instanceof Terminated) { log.debug("terminated"); terminated = (Terminated)msg; sendTerminated(); } else if(msg instanceof GetTerminated) { log.debug("terminated requested"); sender = getSender(); sendTerminated(); } else if(msg instanceof Watch) { log.debug("watch"); getContext().watch(((Watch)msg).actor); getSender().tell(new Ack(), getSelf()); } else { unhandled(msg); } }
@Test public void testTransactionCrash() throws Exception { ActorRef transaction = f.ask(database, new StartTransaction(), TransactionCreated.class).get().getActor(); Throwable t = f.ask(transaction, new SqlQuery("invalid sql", SqlQueryType.QUERY), Failure.class).get().getCause(); assertNotNull(t); assertTrue(t instanceof SQLException); f.ask(transaction, new SqlQuery("select 42", SqlQueryType.QUERY)).get(); // unclear if this should also result in Failure (it doesn't in h2) ActorRef deadWatch = actorSystem.actorOf(AnyRecorder.props()); f.ask(deadWatch, new Watch(transaction), Watching.class).get(); f.ask(transaction, new Rollback(), Ack.class).get(); f.ask(deadWatch, new Wait(1), Waited.class).get(); f.ask(deadWatch, new GetRecording(), Recording.class).get() .assertHasNext() .assertNext(Terminated.class, terminated -> { assertEquals(transaction, terminated.getActor()); }) .assertNotHasNext(); }
@Override public void onReceive(Object msg) throws Exception { if(msg instanceof ImportJobInfo) { handleImportJob((ImportJobInfo)msg); } if(msg instanceof RemoveJobInfo) { handleRemoveJob((RemoveJobInfo)msg); } else if(msg instanceof SessionStarted) { handleSessionStarted((SessionStarted)msg); } else if(msg instanceof SessionFinished) { handleSessionFinished((SessionFinished)msg); } else if(msg instanceof GetActiveJobs) { handleGetActiveJobs((GetActiveJobs)msg); } else if(msg instanceof GetDataSource) { handleGetDataSource((GetDataSource)msg); } else if(msg instanceof Progress) { handleProgress((Progress)msg); } else if(msg instanceof Terminated) { handleTerminated((Terminated)msg); } else { unhandled(msg); } }
private void handleTerminated(Terminated msg) { ActorRef session = msg.getActor(); log.debug("terminated: {}", session); ImportJobInfo importJob = sessions.inverse().get(session); if(importJob == null) { log.error("session unknown"); } else { log.debug("aborting job"); ActorRef jobContext = jobContexts.get(importJob); if(jobContext == null) { log.error("job context not found"); } else { jobContext.tell(new UpdateJobState(JobState.ABORTED), getSelf()); } log.debug("finishing session"); getSelf().tell(new SessionFinished(importJob), getSelf()); } }
@Override public void onReceive(Object msg) throws Exception { log.debug("message: " + msg); if(msg instanceof DataSourceConnected) { handleDataSourceConnected((DataSourceConnected)msg); } else if (msg instanceof Terminated) { handleTerminated((Terminated)msg); } else if (msg instanceof HarvestJobInfo) { handleHarvestJob((HarvestJobInfo)msg); } else if(msg instanceof GetActiveDataSources) { handleGetActiveDataSources(); } else if(msg instanceof GetDataSource) { handleGetDataSource((GetDataSource)msg); } else if(msg instanceof GetActiveJobs) { handleGetActiveJobs(); } else if(msg instanceof StartHarvesting) { handleStartHarvesting((StartHarvesting)msg); } else if(msg instanceof RetryHarvest) { handleRetryHarvest((RetryHarvest)msg); } else { unhandled(msg); } }
@Override public void onReceive(Object msg) throws Exception { if(msg instanceof Connected) { log.debug("client connected"); ActorRef actors = getContext().actorOf(ServerActors.props(harvesterName, harvester, harvesterConfig), "serverActors"); Config sslConfig = ConfigUtils.getOptionalConfig(harvesterConfig, "ssl"); ActorRef messageProtocolHandler = getContext().actorOf(MessageProtocolHandler.props(true, sslConfig, getSender(), actors), "messages"); getContext().watch(messageProtocolHandler); actors.tell(new ListenerInit(messageProtocolHandler), getSelf()); } else if(msg instanceof Terminated) { getContext().stop(getSelf()); } else { unhandled(msg); } }
private Procedure<Object> provisioning(ServiceJobInfo serviceJob, ActorRef initiator, ActorRef watching, Set<EnsureTarget> targets) { return new Procedure<Object>() { Set<JobState> state = new HashSet<>(); @Override public void apply(Object msg) throws Exception { if(msg instanceof UpdateServiceInfo) { handleUpdateServiceInfo((UpdateServiceInfo)msg); } else if(msg instanceof Terminated) { log.error("actor terminated unexpectedly"); jobFailed(initiator); getContext().become(receive()); } else { elseProvisioning(msg, serviceJob, initiator, Optional.of(watching), targets, state); } } }; }
private Procedure<Object> waiting(Set<ActorRef> ensureServices) { return new Procedure<Object>() { @Override public void apply(Object msg) throws Exception { if(msg instanceof Terminated) { ActorRef actor = ((Terminated)msg).getActor(); if(ensureServices.contains(actor)) { ensureServices.remove(actor); if(ensureServices.isEmpty()) { log.debug("all ensure services are terminated"); getContext().stop(getSelf()); } } else { log.error("unknown actor terminated: {}", actor); } } else { unhandled(msg); } } }; }
@Test public void testEmptyService() throws Exception { Service service = mock(Service.class); when(service.getId()).thenReturn("service0"); when(service.getName()).thenReturn("serviceName0"); when(service.getRootId()).thenReturn("root"); when(service.getLayers()).thenReturn(Collections.emptyList()); f.ask(geoServerService, new Ensure(service, new End()), Ack.class).get(); f.ask(recorder, new Wait(3), Waited.class).get(); f.ask(recorder, new GetRecording(), Recording.class).get() .assertNext(EnsureWorkspace.class, workspace -> { assertEquals("serviceName0", workspace.getWorkspaceId()); }) .assertNext(FinishEnsure.class) .assertNext(Terminated.class) .assertNotHasNext(); }
@Override public void onReceive(Object message) throws Exception { if (message instanceof Terminated) { // Handle remote actor termination this.onTerminated((Terminated) message); } else if (message instanceof ActorIdentity) { // Handle remote actor reconnection this.onIdentity((ActorIdentity) message); } // Handle all other events. Transition<?> transition = this.getTransition(this.state, message); if (transition != null) { transition.apply(this, message); } else { unhandled(message); } }
public Drone(DroneConfig config) { this.config = config; this.queen = registerRemoteActor(config.queenPath); this.monitor = registerRemoteActor(config.monitorPath); this.state = State.DISCONNECTED; // Create supervised actors. this.dataFetcher = this.getContext().actorOf(DataFetcher.makeProps(config.trainingSet)); config.trainingSet.reset(); // Define the state machine this.addTransition(State.DISCONNECTED, MsgConnectAndStart.class, new Transition<>(State.CONNECTING, CONNECT)); this.addTransition(State.CONNECTING, ActorIdentity.class, new Transition<>(State.STARTING, GET_INITIAL_MODEL, IS_QUEEN_IDENTITY)); this.addTransition(State.STARTING, MsgGetInitialModel.class, new Transition<>(State.STARTING, GET_INITIAL_MODEL)); this.addTransition(State.STARTING, MsgModel.class, new Transition<>(State.ACTIVE, START_TRAINING)); this.addTransition(State.STARTING, MsgStop.class, new Transition<>(State.STOPPED)); this.addTransition(State.ACTIVE, MsgUpdateDone.class, new Transition<>(State.ACTIVE, NEXT_UPDATE)); this.addTransition(State.ACTIVE, MsgStop.class, new Transition<>(State.STOPPED)); this.addTransition(State.STOPPED, MsgConnectAndStart.class, new Transition<>(State.STARTING, GET_INITIAL_MODEL)); this.addTransition(State.STOPPED, MsgReset.class, new Stay<>(RESET_DATASET)); this.addTransition(Terminated.class, new Transition<>(State.CONNECTING, IS_QUEEN_TERMINATED)); }
@Override public void onReceive(Object message) throws Exception { if (message instanceof Terminated) { final Terminated t = (Terminated) message; if (monitoredActors.containsKey(t.getActor())) { log.info("Received Worker Actor Termination Message -> {}", t .getActor().path()); log.info("Sending message to Supervisor"); monitoredActors.get(t.getActor()).tell(new DeadWorker()); } } else if (message instanceof RegisterWorker) { RegisterWorker msg = (RegisterWorker) message; getContext().watch(msg.worker); monitoredActors.put(msg.worker, msg.supervisor); } else { unhandled(message); } }
@Test public void stopTest() throws Exception { ActorRef workerActor1 = supervisor.underlyingActor().workerActor1; ActorRef workerActor2 = supervisor.underlyingActor().workerActor2; TestProbe probe1 = new TestProbe(_system); TestProbe probe2 = new TestProbe(_system); probe1.watch(workerActor1); probe2.watch(workerActor2); supervisor.tell(String.valueOf("Do Something")); probe1.expectMsgClass(Terminated.class); probe2.expectMsgClass(Terminated.class); }
@Test public void testSupervisorStrategy2() throws Exception { ActorRef supervisorActorRef2 = _system.actorOf(new Props( SupervisorActor.class), "supervisor2"); final TestProbe probe = new TestProbe(_system); // register the BoomActor with the Supervisor final ActorRef child = (ActorRef) Await.result( ask(supervisorActorRef2, new Props(BoomActor.class), 5000), Duration.parse("5 second")); probe.watch(child); // second check child.tell("do something"); probe.expectMsg(new Terminated(child)); }
@Override public Receive createReceive() { return receiveBuilder() .match(ApiRequest.class, this::onRequest) .match(ApiRequestForward.class, this::onForward) .match(MemberOfflineEvent.class, this::onOffline) .match(MemberOnlineEvent.class, this::onOnline) .match(BackendFindEvent.class, e -> getSender().tell(findFrontendSocket, getSelf())) .match(ClusterEvent.CurrentClusterState.class, state -> { for (Member member : state.getMembers()) { if (member.status().equals(MemberStatus.up())) { register(member); } else if (member.status().equals(MemberStatus.removed())) { remove(member); } } }) .match(ClusterEvent.MemberUp.class, mUp -> register(mUp.member())) .match(ClusterEvent.MemberRemoved.class, mRem -> remove(mRem.member())) .match(Terminated.class, t -> { logger.info("Frontend {} left.", getSender()); // getContext().unwatch(getSender()); // ClusterChildNodeSystem.INSTANCE.removeRouterFronted(); }) .build(); }
@Override public Receive createReceive() { return receiveBuilder(). match(CreateGuest.class, createGuest -> { final ActorRef guest = createGuest(createGuest.favoriteCoffee); addGuestToBookkeeper(guest); context().watch(guest); }). match(ApproveCoffee.class, this::coffeeApproved, approveCoffee -> barista.forward(new Barista.PrepareCoffee(approveCoffee.coffee, approveCoffee.guest), context()) ). match(ApproveCoffee.class, approveCoffee -> { log().info("Sorry, {}, but you have reached your limit.", approveCoffee.guest.path().name()); context().stop(approveCoffee.guest); }). //=========================================================================== // ANSWER //=========================================================================== // todo When the `Guest` terminates, remove the `Guest` from caffeineLimit bookkeeping. match(Terminated.class, terminated -> { //=========================================================================== // ANSWER //=========================================================================== // todo Log "Thanks {guest}, for being our guest!" at `info`. log().info("Thanks, {}, for being our guest!", terminated.getActor()); removeGuestFromBookkeeper(terminated.getActor()); }).build(); }
protected void killActor(TestActorRef<TestRaftActor> actor) { JavaTestKit testkit = new JavaTestKit(getSystem()); testkit.watch(actor); actor.tell(PoisonPill.getInstance(), null); testkit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); testkit.unwatch(actor); }
@Override public void onReceive(Object message) throws Exception { if (message instanceof Terminated) { Terminated terminated = (Terminated) message; LOG.debug("Actor terminated : {}", terminated.actor()); } else if (message instanceof Monitor) { Monitor monitor = (Monitor) message; getContext().watch(monitor.getActorRef()); } }