private Container newContainerInstance(int id, Priority priority, Resource capability, String hostName) throws IOException { NodeId nodeId = NodeId.newInstance(hostName, 0); Container container = Records.newRecord(Container.class); container.setNodeId(nodeId); container.setPriority(priority); container.setResource(capability); container.setId(ContainerId.newContainerId(appAttemptId, ++containerId)); Token token = Token.newInstance(nodeId.toString().getBytes(), nodeId.toString(), nodeId.toString().getBytes(), nodeId.toString()); byte[] bytes = container.getId().toString().getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); token.setIdentifier(buffer); container.setContainerToken(token); container.setNodeHttpAddress(hostName + ":0"); return container; }
@SuppressWarnings("unchecked") private void containerAssigned(Container allocated, ContainerRequest assigned) { // Update resource requests decContainerReq(assigned); // send the container-assigned event to task attempt eventHandler.handle(new TaskAttemptContainerAssignedEvent( assigned.attemptID, allocated, applicationACLs)); assignedRequests.add(allocated, assigned.attemptID); if (LOG.isDebugEnabled()) { LOG.info("Assigned container (" + allocated + ") " + " to task " + assigned.attemptID + " on node " + allocated.getNodeId().toString()); } }
@Test public void testSuccessfulFinishingToFinished() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; String trackingUrl = "mytrackingurl"; String diagnostics = "Successful"; unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, diagnostics); NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle( new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, diagnostics, 0, false); }
private void associateTaskWithContainer(TaskStatusLocator locator, Container container) throws Exception { TaskStatus taskStatus = getTaskStatus(locator); String containerId = container.getId().toString(); taskStatus.setContainerId(containerId); taskStatus.setContainerHost(container.getNodeId().getHost()); taskStatus.setContainerIp( DnsClient.resolveExternalIPv4Address(taskStatus.getContainerHost())); taskStatus.setContainerLogHttpAddress( HadoopUtils.getContainerLogHttpAddress(container.getNodeHttpAddress(), containerId, conf.getAmUser())); taskStatus.setContainerConnectionLostCount(0); taskStatus.setContainerGpus( ResourceDescriptor.fromResource(container.getResource()).getGpuAttribute()); taskStatusesesChanged.put(locator.getTaskRoleName(), true); }
private void addContainersFromPreviousAttemptToProto() { maybeInitBuilder(); builder.clearContainersFromPreviousAttempts(); List<ContainerProto> list = new ArrayList<ContainerProto>(); for (Container c : containersFromPreviousAttempts) { list.add(convertToProtoFormat(c)); } builder.addAllContainersFromPreviousAttempts(list); }
@Test public void testAMCrashAtAllocated() { Container amContainer = allocateApplicationAttempt(); String containerDiagMsg = "some error"; int exitCode = 123; ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), cs, anyNodeId)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); boolean shouldCheckURL = (applicationAttempt.getTrackingUrl() != null); verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics(), exitCode, shouldCheckURL); }
public static AllocateResponse newAllocateResponse(int responseId, List<ContainerStatus> completedContainers, List<Container> allocatedContainers, List<NodeReport> updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt) { AllocateResponse response = recordFactory .newRecordInstance(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); response.setResponseId(responseId); response.setCompletedContainersStatuses(completedContainers); response.setAllocatedContainers(allocatedContainers); response.setUpdatedNodes(updatedNodes); response.setAvailableResources(availResources); response.setAMCommand(command); response.setPreemptionMessage(preempt); return response; }
/** * handle method for AMRMClientAsync.CallbackHandler container allocation * * @param containers */ private synchronized void onContainersAllocated(List<Container> containers) { if (this.startAbort) { this.freeUnusedContainers(containers); return; } Collection<Container> freelist = new java.util.LinkedList<Container>(); for (Container c : containers) { if(blackList.contains(c.getNodeHttpAddress())){ //launchDummyTask(c); continue; } TaskRecord task; task = pendingTasks.poll(); if (task == null) { freelist.add(c); continue; } this.launchTask(c, task); } this.freeUnusedContainers(freelist); }
@Override public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) { if (LOG.isDebugEnabled()) { LOG.debug("Succeeded to start Container " + containerId); } Container container = containers.get(containerId); if (container != null) { applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } if(applicationMaster.timelineClient != null) { ApplicationMaster.publishContainerStartEvent( applicationMaster.timelineClient, container, applicationMaster.domainId, applicationMaster.appSubmitterUgi); } }
private synchronized void launchDummyTask(Container container){ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); String new_command = "./launcher.py"; String cmd = new_command + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; ctx.setCommands(Collections.singletonList(cmd)); ctx.setTokens(setupTokens()); ctx.setLocalResources(this.workerResources); synchronized (this){ this.nmClient.startContainerAsync(container, ctx); } }
/** * {@link RMAppAttemptState#RUNNING} */ private void testAppAttemptRunningState(Container container, String host, int rpcPort, String trackingUrl, boolean unmanagedAM) { assertEquals(RMAppAttemptState.RUNNING, applicationAttempt.getAppAttemptState()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(host, applicationAttempt.getHost()); assertEquals(rpcPort, applicationAttempt.getRpcPort()); verifyUrl(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); if (unmanagedAM) { verifyUrl(trackingUrl, applicationAttempt.getTrackingUrl()); } else { assertEquals(getProxyUrl(applicationAttempt), applicationAttempt.getTrackingUrl()); } // TODO - need to add more checks relevant to this state }
@Test public void testAllocatedToFailed() { Container amContainer = allocateApplicationAttempt(); String diagnostics = "Launch Failed"; applicationAttempt.handle( new RMAppAttemptLaunchFailedEvent( applicationAttempt.getAppAttemptId(), diagnostics)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); testAppAttemptFailedState(amContainer, diagnostics); }
/** * The {@link ResourceScheduler} is allocating data-local resources to the * application. * * @param allocatedContainers * resources allocated to the application */ synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority, ResourceRequest nodeLocalRequest, Container container, List<ResourceRequest> resourceRequests) { // Update future requirements decResourceRequest(node.getNodeName(), priority, nodeLocalRequest); ResourceRequest rackLocalRequest = requests.get(priority).get( node.getRackName()); decResourceRequest(node.getRackName(), priority, rackLocalRequest); ResourceRequest offRackRequest = requests.get(priority).get( ResourceRequest.ANY); decrementOutstanding(offRackRequest); // Update cloned NodeLocal, RackLocal and OffRack requests for recovery resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); resourceRequests.add(cloneResourceRequest(rackLocalRequest)); resourceRequests.add(cloneResourceRequest(offRackRequest)); }
@Public @Stable public static AllocateResponse newInstance(int responseId, List<ContainerStatus> completedContainers, List<Container> allocatedContainers, List<NodeReport> updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens) { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); response.setResponseId(responseId); response.setCompletedContainersStatuses(completedContainers); response.setAllocatedContainers(allocatedContainers); response.setUpdatedNodes(updatedNodes); response.setAvailableResources(availResources); response.setAMCommand(command); response.setPreemptionMessage(preempt); response.setNMTokens(nmTokens); return response; }
@SuppressWarnings("unchecked") private ContainerRequest assignToFailedMap(Container allocated) { //try to assign to earlierFailedMaps if present ContainerRequest assigned = null; while (assigned == null && earlierFailedMaps.size() > 0 && canAssignMaps()) { TaskAttemptId tId = earlierFailedMaps.removeFirst(); if (maps.containsKey(tId)) { assigned = maps.remove(tId); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); eventHandler.handle(jce); LOG.info("Assigned from earlierFailedMaps"); break; } } return assigned; }
public static ApplicationAttemptStateData newInstance( ApplicationAttemptId attemptId, Container container, Credentials attemptTokens, long startTime, RMAppAttemptState finalState, String finalTrackingUrl, String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus, long finishTime, long memorySeconds, long vcoreSeconds, long gcoreSeconds) { ApplicationAttemptStateData attemptStateData = Records.newRecord(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); attemptStateData.setMasterContainer(container); attemptStateData.setAppAttemptTokens(attemptTokens); attemptStateData.setState(finalState); attemptStateData.setFinalTrackingUrl(finalTrackingUrl); attemptStateData.setDiagnostics(diagnostics == null ? "" : diagnostics); attemptStateData.setStartTime(startTime); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setAMContainerExitStatus(exitStatus); attemptStateData.setFinishTime(finishTime); attemptStateData.setMemorySeconds(memorySeconds); attemptStateData.setVcoreSeconds(vcoreSeconds); attemptStateData.setGcoreSeconds(gcoreSeconds); return attemptStateData; }
@Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { Assert.assertEquals("response ID mismatch", responseId, request.getResponseId()); ++responseId; org.apache.hadoop.yarn.api.records.Token yarnToken = null; if (amToken != null) { yarnToken = org.apache.hadoop.yarn.api.records.Token.newInstance( amToken.getIdentifier(), amToken.getKind().toString(), amToken.getPassword(), amToken.getService().toString()); } return AllocateResponse.newInstance(responseId, Collections.<ContainerStatus>emptyList(), Collections.<Container>emptyList(), Collections.<NodeReport>emptyList(), Resources.none(), null, 1, null, Collections.<NMToken>emptyList(), yarnToken, Collections.<ContainerResourceIncrease>emptyList(), Collections.<ContainerResourceDecrease>emptyList()); }
@Private @Unstable public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, Map<ApplicationAccessType, String> acls, ByteBuffer key, List<Container> containersFromPreviousAttempt, String queue, List<NMToken> nmTokensFromPreviousAttempts) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); response.setApplicationACLs(acls); response.setClientToAMTokenMasterKey(key); response.setContainersFromPreviousAttempts(containersFromPreviousAttempt); response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts); response.setQueue(queue); return response; }
private static RMAppAttempt createRMAppAttempt( ApplicationAttemptId appAttemptId) { RMAppAttempt appAttempt = mock(RMAppAttempt.class); when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); when(appAttempt.getHost()).thenReturn("test host"); when(appAttempt.getRpcPort()).thenReturn(-100); Container container = mock(Container.class); when(container.getId()) .thenReturn(ContainerId.newContainerId(appAttemptId, 1)); when(appAttempt.getMasterContainer()).thenReturn(container); when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); when(appAttempt.getTrackingUrl()).thenReturn("test url"); when(appAttempt.getFinalApplicationStatus()).thenReturn( FinalApplicationStatus.UNDEFINED); return appAttempt; }
public synchronized void assign(List<Container> containers) throws IOException, YarnException { int numContainers = containers.size(); // Schedule in priority order for (Priority priority : requests.keySet()) { assign(priority, NodeType.NODE_LOCAL, containers); assign(priority, NodeType.RACK_LOCAL, containers); assign(priority, NodeType.OFF_SWITCH, containers); if (containers.isEmpty()) { break; } } int assignedContainers = numContainers - containers.size(); LOG.info("Application " + applicationId + " assigned " + assignedContainers + "/" + numContainers); }
/** * {@link RMAppAttemptState#KILLED} */ private void testAppAttemptKilledState(Container amContainer, String diagnostics) { sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAttemptFinalStateSaved(); assertFalse(transferStateFromPreviousAttempt); verifyApplicationAttemptFinished(RMAppAttemptState.KILLED); }
@Override synchronized public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException { List<ContainerStatus> statuses = new ArrayList<ContainerStatus>(); for (ContainerId containerId : request.getContainerIds()) { List<Container> appContainers = containers.get(containerId.getApplicationAttemptId() .getApplicationId()); Container container = null; for (Container c : appContainers) { if (c.getId().equals(containerId)) { container = c; } } if (container != null && containerStatusMap.get(container).getState() != null) { statuses.add(containerStatusMap.get(container)); } } return GetContainerStatusesResponse.newInstance(statuses, null); }
/** * free the containers that have not yet been launched * * @param containers */ private synchronized void freeUnusedContainers( Collection<Container> containers) { if(containers.size() == 0) return; for(Container c : containers){ //launchDummyTask(c); } }
@Test public void testUnregisterToSuccessfulFinishing() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); unregisterApplicationAttempt(amContainer, FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful"); }
@Test public void testContainerTokenGeneratedOnPullRequest() throws Exception { MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); // request a container. am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>()); ContainerId containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); RMContainer container = rm1.getResourceScheduler().getRMContainer(containerId2); // no container token is generated. Assert.assertEquals(containerId2, container.getContainerId()); Assert.assertNull(container.getContainer().getContainerToken()); // acquire the container. List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers(); Assert.assertEquals(containerId2, containers.get(0).getId()); // container token is generated. Assert.assertNotNull(containers.get(0).getContainerToken()); rm1.stop(); }
@SuppressWarnings("deprecation") @Test public void testContainersCleanupForLastAttempt() { // create a failed attempt. applicationAttempt = new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext, scheduler, masterService, submissionContext, new Configuration(), true, BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, submissionContext.getResource(), 1)); when(submissionContext.getKeepContainersAcrossApplicationAttempts()) .thenReturn(true); when(submissionContext.getMaxAppAttempts()).thenReturn(1); Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); ContainerStatus cs1 = ContainerStatus.newInstance(amContainer.getId(), ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( appAttemptId, cs1, anyNodeId)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertFalse(transferStateFromPreviousAttempt); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); }
/** * get the container allocated for this worker attempt * * @return Container the container allocated for this worker attempt */ public Container getContainer() { try { readLock.lock(); return container; } finally { readLock.unlock(); } }
@Test public void testRunningToKilled() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); applicationAttempt.handle( new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.KILL)); // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Killed state. assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAMHostAndPortInvalidated(); verifyApplicationAttemptFinished(RMAppAttemptState.KILLED); }
private void setupPreviousRunningContainers(RegisterApplicationMasterResponse response) { String containerIdStr = System.getenv(Environment.CONTAINER_ID.name()); ContainerId containerId = ContainerId.fromString(containerIdStr); appAttemptId = containerId.getApplicationAttemptId(); List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info(appAttemptId + " received " + previousAMRunningContainers.size() + " previous attempts' running containers on AM registration."); for (Container container : previousAMRunningContainers) { launchedContainers.add(container.getId()); } allocatedContainerNum.addAndGet(previousAMRunningContainers.size()); }
@Override public void onContainersAllocated(List<Container> allocatedContainers) { allocatedContainerNum.addAndGet(allocatedContainers.size()); ApplicationMaster.this.allocatedContainers.addAll(allocatedContainers); if (allocatedContainerNum.get() == args.totalContainerNum) { startAllContainers(); } }
@Test public void testAllocatedToKilled() { Container amContainer = allocateApplicationAttempt(); applicationAttempt.handle( new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.KILL)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); testAppAttemptKilledState(amContainer, EMPTY_DIAGNOSTICS); }
@Override public void setContainersFromPreviousAttempts(final List<Container> containers) { if (containers == null) { return; } this.containersFromPreviousAttempts = new ArrayList<Container>(); this.containersFromPreviousAttempts.addAll(containers); }
@Override public synchronized void setAllocatedContainers( final List<Container> containers) { if (containers == null) return; // this looks like a bug because it results in append and not set initLocalNewContainerList(); allocatedContainers.addAll(containers); }
@Override public int compareTo(Container other) { if (this.getId().compareTo(other.getId()) == 0) { if (this.getNodeId().compareTo(other.getNodeId()) == 0) { return this.getResource().compareTo(other.getResource()); } else { return this.getNodeId().compareTo(other.getNodeId()); } } else { return this.getId().compareTo(other.getId()); } }
@Override public void onContainersAllocated(List<Container> allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory() + ", containerResourceVirtualCores" + allocatedContainer.getResource().getVirtualCores() + ", containerResourceGpuCores" + allocatedContainer.getResource().getGpuCores()); // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer, containerListener); Thread launchThread = new Thread(runnableLaunchContainer); // launch and start the container on a separate thread to keep // the main thread unblocked // as all containers may not be allocated at one go. launchThreads.add(launchThread); launchThread.start(); } }
static LeafQueue stubLeafQueue(LeafQueue queue) { // Mock some methods for ease in these unit tests // 1. LeafQueue.createContainer to return dummy containers doAnswer(new Answer<Container>() { @Override public Container answer(InvocationOnMock invocation) throws Throwable { final FiCaSchedulerApp application = (FiCaSchedulerApp) (invocation .getArguments()[0]); final ContainerId containerId = TestUtils .getMockContainerId(application); Container container = TestUtils.getMockContainer(containerId, ((FiCaSchedulerNode) (invocation.getArguments()[1])).getNodeID(), (Resource) (invocation.getArguments()[2]), ((Priority) invocation.getArguments()[3])); return container; } }).when(queue).createContainer(any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class)); // 2. Stub out LeafQueue.parent.completedContainer CSQueue parent = queue.getParent(); doNothing().when(parent).completedContainer(any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); return queue; }
private Container mockContainer(int i) { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId = ContainerId.newContainerId(attemptId, i); nodeId = NodeId.newInstance("localhost", 0); // Create an empty record containerToken = recordFactory.newRecordInstance(Token.class); return Container.newInstance(containerId, nodeId, null, null, null, containerToken); }
@Test(timeout = 10000) public void testLaunchedAtFinalSaving() { Container amContainer = allocateApplicationAttempt(); // ALLOCATED->FINAL_SAVING applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt .getAppAttemptId(), RMAppAttemptEventType.KILL)); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); // verify for both launched and launch_failed transitions in final_saving applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt .getAppAttemptId(), RMAppAttemptEventType.LAUNCHED)); applicationAttempt.handle(new RMAppAttemptLaunchFailedEvent( applicationAttempt.getAppAttemptId(), "Launch Failed")); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); testAppAttemptKilledState(amContainer, EMPTY_DIAGNOSTICS); // verify for both launched and launch_failed transitions in killed applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt .getAppAttemptId(), RMAppAttemptEventType.LAUNCHED)); applicationAttempt.handle(new RMAppAttemptLaunchFailedEvent( applicationAttempt.getAppAttemptId(), "Launch Failed")); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); }
private static void publishContainerStartEvent( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); event.addEventInfo("Node", container.getNodeId().toString()); event.addEventInfo("Resources", container.getResource().toString()); entity.addEvent(event); try { ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { @Override public TimelinePutResponse run() throws Exception { return timelineClient.putEntities(entity); } }); } catch (Exception e) { LOG.error("Container start event could not be published for " + container.getId().toString(), e instanceof UndeclaredThrowableException ? e.getCause() : e); } }
/** * The {@link ResourceScheduler} is allocating data-local resources to the * application. * * @param allocatedContainers * resources allocated to the application */ synchronized private void allocateRackLocal(SchedulerNode node, Priority priority, ResourceRequest rackLocalRequest, Container container, List<ResourceRequest> resourceRequests) { // Update future requirements decResourceRequest(node.getRackName(), priority, rackLocalRequest); ResourceRequest offRackRequest = requests.get(priority).get( ResourceRequest.ANY); decrementOutstanding(offRackRequest); // Update cloned RackLocal and OffRack requests for recovery resourceRequests.add(cloneResourceRequest(rackLocalRequest)); resourceRequests.add(cloneResourceRequest(offRackRequest)); }