@Test public void testNMContainerStatus() { ApplicationId appId = ApplicationId.newInstance(123456789, 1); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId = ContainerId.newContainerId(attemptId, 1); Resource resource = Resource.newInstance(1000, 200, 300); NMContainerStatus report = NMContainerStatus.newInstance(containerId, ContainerState.COMPLETE, resource, "diagnostics", ContainerExitStatus.ABORTED, Priority.newInstance(10), 1234); NMContainerStatus reportProto = new NMContainerStatusPBImpl( ((NMContainerStatusPBImpl) report).getProto()); Assert.assertEquals("diagnostics", reportProto.getDiagnostics()); Assert.assertEquals(resource, reportProto.getAllocatedResource()); Assert.assertEquals(ContainerExitStatus.ABORTED, reportProto.getContainerExitStatus()); Assert.assertEquals(ContainerState.COMPLETE, reportProto.getContainerState()); Assert.assertEquals(containerId, reportProto.getContainerId()); Assert.assertEquals(Priority.newInstance(10), reportProto.getPriority()); Assert.assertEquals(1234, reportProto.getCreationTime()); }
@Override public ApplicationState transition(ApplicationImpl app, ApplicationEvent event) { ApplicationFinishEvent appEvent = (ApplicationFinishEvent)event; if (app.containers.isEmpty()) { // No container to cleanup. Cleanup app level resources. app.handleAppFinishWithContainersCleanedup(); return ApplicationState.APPLICATION_RESOURCES_CLEANINGUP; } // Send event to ContainersLauncher to finish all the containers of this // application. for (ContainerId containerID : app.containers.keySet()) { app.dispatcher.getEventHandler().handle( new ContainerKillEvent(containerID, ContainerExitStatus.KILLED_AFTER_APP_COMPLETION, "Container killed on application-finish event: " + appEvent.getDiagnostic())); } return ApplicationState.FINISHING_CONTAINERS_WAIT; }
@Test public void testKillOnNew() throws Exception { WrappedContainer wc = null; try { wc = new WrappedContainer(13, 314159265358979L, 4344, "yak"); assertEquals(ContainerState.NEW, wc.c.getContainerState()); int killed = metrics.getKilledContainers(); wc.killContainer(); assertEquals(ContainerState.DONE, wc.c.getContainerState()); assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, wc.c.cloneAndGetContainerStatus().getExitStatus()); assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics() .contains("KillRequest")); assertEquals(killed + 1, metrics.getKilledContainers()); } finally { if (wc != null) { wc.finished(); } } }
@Test public void testKillOnLocalizing() throws Exception { WrappedContainer wc = null; try { wc = new WrappedContainer(14, 314159265358979L, 4344, "yak"); wc.initContainer(); assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState()); wc.killContainer(); assertEquals(ContainerState.KILLING, wc.c.getContainerState()); assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, wc.c.cloneAndGetContainerStatus().getExitStatus()); assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics() .contains("KillRequest")); int killed = metrics.getKilledContainers(); wc.containerResourcesCleanup(); assertEquals(ContainerState.DONE, wc.c.getContainerState()); assertEquals(killed + 1, metrics.getKilledContainers()); } finally { if (wc != null) { wc.finished(); } } }
private static void updateAttemptMetrics(RMContainerImpl container) { // If this is a preempted container, update preemption metrics Resource resource = container.getContainer().getResource(); RMAppAttempt rmAttempt = container.rmContext.getRMApps() .get(container.getApplicationAttemptId().getApplicationId()) .getCurrentAppAttempt(); if (ContainerExitStatus.PREEMPTED == container.finishedStatus .getExitStatus()) { rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource, container); } if (rmAttempt != null) { long usedMillis = container.finishTime - container.creationTime; long memorySeconds = resource.getMemory() * usedMillis / DateUtils.MILLIS_PER_SECOND; long vcoreSeconds = resource.getVirtualCores() * usedMillis / DateUtils.MILLIS_PER_SECOND; long gcoreSeconds = resource.getGpuCores() * usedMillis / DateUtils.MILLIS_PER_SECOND; rmAttempt.getRMAppAttemptMetrics() .updateAggregateAppResourceUsage(memorySeconds,vcoreSeconds, gcoreSeconds); } }
@Test public void testNMContainerStatus() { ApplicationId appId = ApplicationId.newInstance(123456789, 1); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId = ContainerId.newContainerId(attemptId, 1); Resource resource = Resource.newInstance(1000, 200); NMContainerStatus report = NMContainerStatus.newInstance(containerId, ContainerState.COMPLETE, resource, "diagnostics", ContainerExitStatus.ABORTED, Priority.newInstance(10), 1234); NMContainerStatus reportProto = new NMContainerStatusPBImpl( ((NMContainerStatusPBImpl) report).getProto()); Assert.assertEquals("diagnostics", reportProto.getDiagnostics()); Assert.assertEquals(resource, reportProto.getAllocatedResource()); Assert.assertEquals(ContainerExitStatus.ABORTED, reportProto.getContainerExitStatus()); Assert.assertEquals(ContainerState.COMPLETE, reportProto.getContainerState()); Assert.assertEquals(containerId, reportProto.getContainerId()); Assert.assertEquals(Priority.newInstance(10), reportProto.getPriority()); Assert.assertEquals(1234, reportProto.getCreationTime()); }
private static void updateAttemptMetrics(RMContainerImpl container) { // If this is a preempted container, update preemption metrics Resource resource = container.getContainer().getResource(); RMAppAttempt rmAttempt = container.rmContext.getRMApps() .get(container.getApplicationAttemptId().getApplicationId()) .getCurrentAppAttempt(); if (rmAttempt != null) { if (ContainerExitStatus.PREEMPTED == container.finishedStatus .getExitStatus()) { rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource, container); } long usedMillis = container.finishTime - container.creationTime; long memorySeconds = resource.getMemory() * usedMillis / DateUtils.MILLIS_PER_SECOND; long vcoreSeconds = resource.getVirtualCores() * usedMillis / DateUtils.MILLIS_PER_SECOND; rmAttempt.getRMAppAttemptMetrics() .updateAggregateAppResourceUsage(memorySeconds,vcoreSeconds); } }
@Override public void transition(RMContainerImpl container, RMContainerEvent event) { RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; //add the suspend time container.suspendTime.add(System.currentTimeMillis()); Resource resource = container.getLastPreemptedResource(); container.finishedStatus = finishedEvent.getRemoteContainerStatus(); container.isSuspending = true; //update preempt metrics RMAppAttempt rmAttempt = container.rmContext.getRMApps() .get(container.getApplicationAttemptId().getApplicationId()) .getCurrentAppAttempt(); if (ContainerExitStatus.PREEMPTED == container.finishedStatus.getExitStatus()) { rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,container); } }
private static void updateAttemptMetrics(RMContainerImpl container) { // If this is a preempted container, update preemption metrics Resource resource = container.getContainer().getResource(); RMAppAttempt rmAttempt = container.rmContext.getRMApps() .get(container.getApplicationAttemptId().getApplicationId()) .getCurrentAppAttempt(); if (ContainerExitStatus.PREEMPTED == container.finishedStatus .getExitStatus()) { rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource, container); } if (rmAttempt != null) { long usedMillis = container.finishTime - container.creationTime; long memorySeconds = resource.getMemory() * usedMillis / DateUtils.MILLIS_PER_SECOND; long vcoreSeconds = resource.getVirtualCores() * usedMillis / DateUtils.MILLIS_PER_SECOND; rmAttempt.getRMAppAttemptMetrics() .updateAggregateAppResourceUsage(memorySeconds,vcoreSeconds); } }
@Override public void onContainersCompleted(List<ContainerStatus> containerStatuses) { LOG.info(containerStatuses.size() + " containers have completed"); for (ContainerStatus status : containerStatuses) { int exitStatus = status.getExitStatus(); if (0 != exitStatus) { // container failed if (ContainerExitStatus.ABORTED != exitStatus) { totalFailures.incrementAndGet(); } else { // container was killed by framework, possibly preempted // we should re-try as the container was lost for some reason } } else { // nothing to do // container completed successfully containerAllocation.get(status.getContainerId()).containerCompleted(status.getContainerId()); LOG.info("Container id = " + status.getContainerId() + " completed successfully"); } } }
@Override public void onContainersCompleted(List<ContainerStatus> containerStatuses) { LOG.info(containerStatuses.size() + " containers have completed"); for (ContainerStatus status : containerStatuses) { int exitStatus = status.getExitStatus(); if (0 != exitStatus) { // container failed if (ContainerExitStatus.ABORTED != exitStatus) { totalFailures.incrementAndGet(); containerAllocation.remove(status.getContainerId()).containerCompleted(status.getContainerId()); } else { // container was killed by framework, possibly preempted // we should re-try as the container was lost for some reason } } else { // nothing to do // container completed successfully LOG.info("Container id = " + status.getContainerId() + " completed successfully"); containerAllocation.remove(status.getContainerId()).containerCompleted(status.getContainerId()); } } }
@Override public void onContainersCompleted(List<ContainerStatus> containerStatuses) { LOG.info(containerStatuses.size() + " containers have completed"); for (ContainerStatus status : containerStatuses) { int exitStatus = status.getExitStatus(); if (0 != exitStatus) { // container failed if (ContainerExitStatus.ABORTED != exitStatus) { totalCompleted.incrementAndGet(); totalFailures.incrementAndGet(); } else { // container was killed by framework, possibly preempted // we should re-try as the container was lost for some reason } } else { // nothing to do // container completed successfully totalCompleted.incrementAndGet(); LOG.info("Container id = " + status.getContainerId() + " completed successfully"); } } }
@Test public void testNMContainerStatus() { ApplicationId appId = ApplicationId.newInstance(123456789, 1); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId = ContainerId.newContainerId(attemptId, 1); Resource resource = Resource.newInstance(1000, 200, 300); NMContainerStatus report = NMContainerStatus.newInstance(containerId, 0, ContainerState.COMPLETE, resource, "diagnostics", ContainerExitStatus.ABORTED, Priority.newInstance(10), 1234); NMContainerStatus reportProto = new NMContainerStatusPBImpl( ((NMContainerStatusPBImpl) report).getProto()); Assert.assertEquals("diagnostics", reportProto.getDiagnostics()); Assert.assertEquals(resource, reportProto.getAllocatedResource()); Assert.assertEquals(ContainerExitStatus.ABORTED, reportProto.getContainerExitStatus()); Assert.assertEquals(ContainerState.COMPLETE, reportProto.getContainerState()); Assert.assertEquals(containerId, reportProto.getContainerId()); Assert.assertEquals(Priority.newInstance(10), reportProto.getPriority()); Assert.assertEquals(1234, reportProto.getCreationTime()); }
@Test public void testKillOnNew() throws Exception { WrappedContainer wc = null; try { wc = new WrappedContainer(13, 314159265358979L, 4344, "yak", "yakFolder"); assertEquals(ContainerState.NEW, wc.c.getContainerState()); int killed = metrics.getKilledContainers(); wc.killContainer(); assertEquals(ContainerState.DONE, wc.c.getContainerState()); verifyOutofBandHeartBeat(wc); assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, wc.c.cloneAndGetContainerStatus().getExitStatus()); assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics() .contains("KillRequest")); assertEquals(killed + 1, metrics.getKilledContainers()); } finally { if (wc != null) { wc.finished(); } } }
@Test public void testKillOnLocalizing() throws Exception { WrappedContainer wc = null; try { wc = new WrappedContainer(14, 314159265358979L, 4344, "yak", "yakFolder"); wc.initContainer(); assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState()); wc.killContainer(); assertEquals(ContainerState.KILLING, wc.c.getContainerState()); assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, wc.c.cloneAndGetContainerStatus().getExitStatus()); assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics() .contains("KillRequest")); int killed = metrics.getKilledContainers(); wc.containerResourcesCleanup(); assertEquals(ContainerState.DONE, wc.c.getContainerState()); assertEquals(killed + 1, metrics.getKilledContainers()); } finally { if (wc != null) { wc.finished(); } } }
private void updateSchedulerHealthForCompletedContainer( RMContainer rmContainer, ContainerStatus containerStatus) { // Update SchedulerHealth for released / preempted container SchedulerHealth schedulerHealth = csContext.getSchedulerHealth(); if (null == schedulerHealth) { // Only do update if we have schedulerHealth return; } if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(), rmContainer.getContainerId(), getQueuePath()); schedulerHealth.updateSchedulerPreemptionCounts(1); } else { schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(), rmContainer.getAllocatedNode(), rmContainer.getContainerId(), getQueuePath()); } }
private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(ContainerStatus containerStatus) { ImmutableMap.Builder<String, String> eventMetadataBuilder = new ImmutableMap.Builder<>(); eventMetadataBuilder.put(GobblinYarnMetricTagNames.CONTAINER_ID, containerStatus.getContainerId().toString()); eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_CONTAINER_STATE, containerStatus.getState().toString()); if (ContainerExitStatus.INVALID != containerStatus.getExitStatus()) { eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_EXIT_STATUS, containerStatus.getExitStatus() + ""); } if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) { eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_EXIT_DIAGNOSTICS, containerStatus.getDiagnostics()); } return eventMetadataBuilder; }
@Override public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) { // Inform the Containers about completion. AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId()); if (amContainer != null) { String message = null; int exitStatus = containerStatus.getExitStatus(); if (exitStatus == ContainerExitStatus.PREEMPTED) { message = "Container preempted externally. "; } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) { message = "Container disk failed. "; } else { message = "Container failed. "; } if (containerStatus.getDiagnostics() != null) { message += containerStatus.getDiagnostics(); } sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message)); } }
private void handleExtraTAAssign( AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) { this.inError = true; String errorMessage = "AMScheduler Error: Multiple simultaneous " + "taskAttempt allocations to: " + this.getContainerId() + ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() + ". Current state: " + this.getState(); this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId()); this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage); this.sendTerminatingToTaskAttempt(currentTaId, errorMessage); this.registerFailedAttempt(event.getTaskAttemptId()); LOG.warn(errorMessage); this.logStopped(ContainerExitStatus.INVALID); this.sendStopRequestToNM(); this.unregisterFromTAListener(); this.unregisterFromContainerListener(); }
private void testContainerStoppedEvent() throws Exception { ContainerStoppedEvent event = new ContainerStoppedEvent( ContainerId.newInstance(ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1), 1001), 100034566, ContainerExitStatus.SUCCESS, ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1)); ContainerStoppedEvent deserializedEvent = (ContainerStoppedEvent) testProtoConversion(event); Assert.assertEquals(event.getContainerId(), deserializedEvent.getContainerId()); Assert.assertEquals(event.getStoppedTime(), deserializedEvent.getStoppedTime()); Assert.assertEquals(event.getApplicationAttemptId(), deserializedEvent.getApplicationAttemptId()); logEvents(event, deserializedEvent); }
public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) { // SchedulerId isn't used here since no node updates are sent out // Inform the Containers about completion. AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId()); if (amContainer != null) { String message = "Container completed. "; TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED; int exitStatus = containerStatus.getExitStatus(); if (exitStatus == ContainerExitStatus.PREEMPTED) { message = "Container preempted externally. "; errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION; } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) { message = "Container disk failed. "; errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR; } else if (exitStatus != ContainerExitStatus.SUCCESS){ message = "Container failed, exitCode=" + exitStatus + ". "; } if (containerStatus.getDiagnostics() != null) { message += containerStatus.getDiagnostics(); } sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause)); } }
public void preemptContainer(int schedulerId, ContainerId containerId) { // TODO Why is this making a call back into the scheduler, when the call is originating from there. // An AMContainer instance should already exist if an attempt is being made to preempt it AMContainer amContainer = appContext.getAllContainers().get(containerId); try { taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId); } catch (Exception e) { String msg = "Error in TaskScheduler when preempting container" + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(amContainer.getTaskSchedulerIdentifier(), appContext) + ", containerId=" + containerId; LOG.error(msg, e); sendEvent( new DAGAppMasterEventUserServiceFatalError( DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, msg, e)); } // Inform the Containers about completion. sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID, "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION)); }
private void handleExtraTAAssign( AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) { setError(); String errorMessage = "AMScheduler Error: Multiple simultaneous " + "taskAttempt allocations to: " + this.getContainerId() + ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() + ". Current state: " + this.getState(); this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId()); this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR); this.sendTerminatingToTaskAttempt(currentTaId, errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR); this.registerFailedAttempt(event.getTaskAttemptId()); LOG.warn(errorMessage); this.logStopped(ContainerExitStatus.INVALID); this.sendStopRequestToNM(); this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR, errorMessage); this.unregisterFromContainerListener(); }
@SuppressWarnings("deprecation") private void testContainerStoppedEvent() throws Exception { ContainerStoppedEvent event = new ContainerStoppedEvent( ContainerId.newInstance(ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1), 1001), 100034566, ContainerExitStatus.SUCCESS, ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1)); ContainerStoppedEvent deserializedEvent = (ContainerStoppedEvent) testProtoConversion(event); Assert.assertEquals(event.getContainerId(), deserializedEvent.getContainerId()); Assert.assertEquals(event.getStoppedTime(), deserializedEvent.getStoppedTime()); Assert.assertEquals(event.getApplicationAttemptId(), deserializedEvent.getApplicationAttemptId()); logEvents(event, deserializedEvent); }
public ContainerInfo(final Context nmContext, final Container container, String requestUri, String pathPrefix) { this.id = container.getContainerId().toString(); this.nodeId = nmContext.getNodeId().toString(); ContainerStatus containerData = container.cloneAndGetContainerStatus(); this.exitCode = containerData.getExitStatus(); this.exitStatus = (this.exitCode == ContainerExitStatus.INVALID) ? "N/A" : String.valueOf(exitCode); this.state = container.getContainerState().toString(); this.diagnostics = containerData.getDiagnostics(); if (this.diagnostics == null || this.diagnostics.isEmpty()) { this.diagnostics = ""; } this.user = container.getUser(); Resource res = container.getResource(); if (res != null) { this.totalMemoryNeededMB = res.getMemory(); this.totalVCoresNeeded = res.getVirtualCores(); } this.containerLogsShortLink = ujoin("containerlogs", this.id, container.getUser()); if (requestUri == null) { requestUri = ""; } if (pathPrefix == null) { pathPrefix = ""; } this.containerLogsLink = join(requestUri, pathPrefix, this.containerLogsShortLink); }
@SuppressWarnings("unchecked") private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, ContainerId containerID) throws YarnException, IOException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, true, nmTokenIdentifier); if (container == null) { if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { throw RPCUtil.getRemoteException("Container " + containerIDStr + " is not handled by this NodeManager"); } } else { context.getNMStateStore().storeContainerKilled(containerID); dispatcher.getEventHandler().handle( new ContainerKillEvent(containerID, ContainerExitStatus.KILLED_BY_APPMASTER, "Container killed by the ApplicationMaster.")); NMAuditLogger.logSuccess(container.getUser(), AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID .getApplicationAttemptId().getApplicationId(), containerID); // TODO: Move this code to appropriate place once kill_container is // implemented. nodeStatusUpdater.sendOutofBandHeartBeat(); } }
@SuppressWarnings("unchecked") @Override public void handle(ContainerManagerEvent event) { switch (event.getType()) { case FINISH_APPS: CMgrCompletedAppsEvent appsFinishedEvent = (CMgrCompletedAppsEvent) event; for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) { String diagnostic = ""; if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) { diagnostic = "Application killed on shutdown"; } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) { diagnostic = "Application killed by ResourceManager"; } try { this.context.getNMStateStore().storeFinishedApplication(appID); } catch (IOException e) { LOG.error("Unable to update application state in store", e); } this.dispatcher.getEventHandler().handle( new ApplicationFinishEvent(appID, diagnostic)); } break; case FINISH_CONTAINERS: CMgrCompletedContainersEvent containersFinishedEvent = (CMgrCompletedContainersEvent) event; for (ContainerId container : containersFinishedEvent .getContainersToCleanup()) { this.dispatcher.getEventHandler().handle( new ContainerKillEvent(container, ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, "Container Killed by ResourceManager")); } break; default: throw new YarnRuntimeException( "Got an unknown ContainerManagerEvent type: " + event.getType()); } }
@Override public synchronized void storeContainerLaunched(ContainerId containerId) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); if (rcs.exitCode != ContainerExitStatus.INVALID) { throw new IOException("Container already completed"); } rcs.status = RecoveredContainerStatus.LAUNCHED; }