@Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { lastAsk = request.getAskList(); for (ResourceRequest req : lastAsk) { if (ResourceRequest.ANY.equals(req.getResourceName())) { Priority priority = req.getPriority(); if (priority.equals(RMContainerAllocator.PRIORITY_MAP)) { lastAnyAskMap = req.getNumContainers(); } else if (priority.equals(RMContainerAllocator.PRIORITY_REDUCE)){ lastAnyAskReduce = req.getNumContainers(); } } } AllocateResponse response = AllocateResponse.newInstance( request.getResponseId(), containersToComplete, containersToAllocate, Collections.<NodeReport>emptyList(), Resource.newInstance(512000, 1024, 1024), null, 10, null, Collections.<NMToken>emptyList()); containersToComplete.clear(); containersToAllocate.clear(); return response; }
private Resource getTotalResource(List<ResourceRequest> requests) { Resource totalResource = Resource.newInstance(0, 0, 0); if (requests == null) { return totalResource; } for (ResourceRequest request : requests) { if (request.getNumContainers() == 0) { continue; } if (request.getResourceName().equals(ResourceRequest.ANY)) { Resources.addTo( totalResource, Resources.multiply(request.getCapability(), request.getNumContainers())); } } return totalResource; }
private int assignRackLocalContainers(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getRackName()); if (request != null) { // Don't allocate on this rack if the application doens't need containers ResourceRequest offSwitchRequest = application.getResourceRequest(priority, ResourceRequest.ANY); if (offSwitchRequest.getNumContainers() <= 0) { return 0; } int assignableContainers = Math.min( getMaxAllocatableContainers(application, priority, node, NodeType.RACK_LOCAL), request.getNumContainers()); assignedContainers = assignContainer(node, application, priority, assignableContainers, request, NodeType.RACK_LOCAL); } return assignedContainers; }
protected ApplicationAttemptId createSchedulingRequest(String queueId, String userId, List<ResourceRequest> ask) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { scheduler.addApplicationAttempt(id, false, false); } RMApp rmApp = mock(RMApp.class); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id,resourceManager.getRMContext())); resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null); return id; }
private LogAggregationContext getLogAggregationContextFromContainerToken( MockRM rm1, MockNM nm1, LogAggregationContext logAggregationContext) throws Exception { RMApp app2 = rm1.submitApp(200, logAggregationContext); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); nm1.nodeHeartbeat(true); // request a container. am2.allocate("127.0.0.1", 512, 1, new ArrayList<ContainerId>()); ContainerId containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED); // acquire the container. List<Container> containers = am2.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers(); Assert.assertEquals(containerId, containers.get(0).getId()); // container token is generated. Assert.assertNotNull(containers.get(0).getContainerToken()); ContainerTokenIdentifier token = BuilderUtils.newContainerTokenIdentifier(containers.get(0) .getContainerToken()); return token.getLogAggregationContext(); }
/** * The {@link ResourceScheduler} is allocating data-local resources to the * application. * * @param allocatedContainers * resources allocated to the application */ synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority, ResourceRequest nodeLocalRequest, Container container, List<ResourceRequest> resourceRequests) { // Update future requirements decResourceRequest(node.getNodeName(), priority, nodeLocalRequest); ResourceRequest rackLocalRequest = requests.get(priority).get( node.getRackName()); decResourceRequest(node.getRackName(), priority, rackLocalRequest); ResourceRequest offRackRequest = requests.get(priority).get( ResourceRequest.ANY); decrementOutstanding(offRackRequest); // Update cloned NodeLocal, RackLocal and OffRack requests for recovery resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); resourceRequests.add(cloneResourceRequest(rackLocalRequest)); resourceRequests.add(cloneResourceRequest(offRackRequest)); }
@Override public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req .getPriority(), req.getResourceName(), req.getCapability(), req .getNumContainers(), req.getRelaxLocality()); askCopy.add(reqCopy); } SecurityUtil.setTokenServiceUseIp(false); lastAsk = ask; lastRelease = release; lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; return super.allocate( applicationAttemptId, askCopy, release, blacklistAdditions, blacklistRemovals); }
synchronized public void move(Queue newQueue) { QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); for (Map<String, ResourceRequest> asks : requests.values()) { ResourceRequest request = asks.get(ResourceRequest.ANY); if (request != null) { oldMetrics.decrPendingResources(user, request.getNumContainers(), request.getCapability()); newMetrics.incrPendingResources(user, request.getNumContainers(), request.getCapability()); Resource delta = Resources.multiply(request.getCapability(), request.getNumContainers()); // Update Queue queue.decPendingResource(request.getNodeLabelExpression(), delta); newQueue.incPendingResource(request.getNodeLabelExpression(), delta); } } oldMetrics.moveAppFrom(this); newMetrics.moveAppTo(this); activeUsersManager.deactivateApplication(user, applicationId); activeUsersManager = newQueue.getActiveUsersManager(); activeUsersManager.activateApplication(user, applicationId); this.queue = newQueue; this.queueName = newQueue.getQueueName(); }
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { // clear pending resources metrics for the application QueueMetrics metrics = queue.getMetrics(); for (Map<String, ResourceRequest> asks : requests.values()) { ResourceRequest request = asks.get(ResourceRequest.ANY); if (request != null) { metrics.decrPendingResources(user, request.getNumContainers(), request.getCapability()); // Update Queue queue.decPendingResource( request.getNodeLabelExpression(), Resources.multiply(request.getCapability(), request.getNumContainers())); } } metrics.finishAppAttempt(applicationId, pending, user); // Clear requests themselves clearRequests(); }
private static void normalizeNodeLabelExpressionInRequest( ResourceRequest resReq, QueueInfo queueInfo) { String labelExp = resReq.getNodeLabelExpression(); // if queue has default label expression, and RR doesn't have, use the // default label expression of queue if (labelExp == null && queueInfo != null && ResourceRequest.ANY .equals(resReq.getResourceName())) { labelExp = queueInfo.getDefaultNodeLabelExpression(); } // If labelExp still equals to null, set it to be NO_LABEL if (labelExp == null) { labelExp = RMNodeLabelsManager.NO_LABEL; } resReq.setNodeLabelExpression(labelExp); }
@Override public synchronized void removeContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); Set<String> allRacks = new HashSet<String>(); if (req.getRacks() != null) { allRacks.addAll(req.getRacks()); } allRacks.addAll(resolveRacks(req.getNodes())); // Update resource requests if (req.getNodes() != null) { for (String node : new HashSet<String>(req.getNodes())) { decResourceRequest(req.getPriority(), node, req.getCapability(), req); } } for (String rack : allRacks) { decResourceRequest(req.getPriority(), rack, req.getCapability(), req); } decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(), req); }
public ResourceRequest createResourceReq(String resource, int memory, int priority, int containers, String labelExpression) throws Exception { ResourceRequest req = Records.newRecord(ResourceRequest.class); req.setResourceName(resource); req.setNumContainers(containers); Priority pri = Records.newRecord(Priority.class); pri.setPriority(priority); req.setPriority(pri); Resource capability = Records.newRecord(Resource.class); capability.setMemory(memory); req.setCapability(capability); if (labelExpression != null) { req.setNodeLabelExpression(labelExpression); } return req; }
protected void addContainerReq(ContainerRequest req) { // Create resource requests for (String host : req.hosts) { // Data-local if (!isNodeBlacklisted(host)) { addResourceRequest(req.priority, host, req.capability, null); } } // Nothing Rack-local for now for (String rack : req.racks) { addResourceRequest(req.priority, rack, req.capability, null); } // Off-switch addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.nodeLabelExpression); }
@Public @Stable public static AllocateRequest newInstance(int responseID, float appProgress, List<ResourceRequest> resourceAsk, List<ContainerId> containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest, List<ContainerResourceIncreaseRequest> increaseRequests) { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setResponseId(responseID); allocateRequest.setProgress(appProgress); allocateRequest.setAskList(resourceAsk); allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); allocateRequest.setIncreaseRequests(increaseRequests); return allocateRequest; }
private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // This code looks weird but is needed because of the following scenario. // A ResourceRequest is removed from the remoteRequestTable. A 0 container // request is added to 'ask' to notify the RM about not needing it any more. // Before the call to allocate, the user now requests more containers. If // the locations of the 0 size request and the new request are the same // (with the difference being only container count), then the set comparator // will consider both to be the same and not add the new request to ask. So // we need to check for the "same" request being present and remove it and // then add it back. The comparator is container count agnostic. // This should happen only rarely but we do need to guard against it. if(ask.contains(remoteRequest)) { ask.remove(remoteRequest); } ask.add(remoteRequest); }
/** * This method produces an Allocation that includes the current view * of the resources that will be allocated to and preempted from this * application. * * @param rc * @param clusterResource * @param minimumAllocation * @return an allocation */ public synchronized Allocation getAllocation(ResourceCalculator rc, Resource clusterResource, Resource minimumAllocation) { Set<ContainerId> currentContPreemption = Collections.unmodifiableSet( new HashSet<ContainerId>(containersToPreempt)); containersToPreempt.clear(); Resource tot = Resource.newInstance(0, 0, 0); for(ContainerId c : currentContPreemption){ Resources.addTo(tot, liveContainers.get(c).getContainer().getResource()); } int numCont = (int) Math.ceil( Resources.divide(rc, clusterResource, tot, minimumAllocation)); ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); ContainersAndNMTokensAllocation allocation = pullNewlyAllocatedContainersAndNMTokens(); Resource headroom = getHeadroom(); setApplicationHeadroomForMetrics(headroom); return new Allocation(allocation.getContainerList(), headroom, null, currentContPreemption, Collections.singletonList(rr), allocation.getNMTokenList()); }
protected void addOutstandingRequestOnResync() { for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable .values()) { for (Map<Resource, ResourceRequest> capabalities : rr.values()) { for (ResourceRequest request : capabalities.values()) { addResourceRequestToAsk(request); } } } if (!ignoreBlacklisting.get()) { blacklistAdditions.addAll(blacklistedNodes); } if (!pendingRelease.isEmpty()) { release.addAll(pendingRelease); } requestLimitsToUpdate.addAll(requestLimits.keySet()); }
private void validateLabelsRequests(ResourceRequest resourceRequest, boolean isReduce) { switch (resourceRequest.getResourceName()) { case "map": case "reduce": case NetworkTopology.DEFAULT_RACK: Assert.assertNull(resourceRequest.getNodeLabelExpression()); break; case "*": Assert.assertEquals(isReduce ? "ReduceNodes" : "MapNodes", resourceRequest.getNodeLabelExpression()); break; default: Assert.fail("Invalid resource location " + resourceRequest.getResourceName()); } }
public List<ResourceRequest> createReq(String[] hosts, int memory, int priority, int containers, String labelExpression) throws Exception { List<ResourceRequest> reqs = new ArrayList<ResourceRequest>(); if (hosts != null) { for (String host : hosts) { // only add host/rack request when asked host isn't ANY if (!host.equals(ResourceRequest.ANY)) { ResourceRequest hostReq = createResourceReq(host, memory, priority, containers, labelExpression); reqs.add(hostReq); ResourceRequest rackReq = createResourceReq("/default-rack", memory, priority, containers, labelExpression); reqs.add(rackReq); } } } ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory, priority, containers, labelExpression); reqs.add(offRackReq); return reqs; }
private void initAsks() { if (this.ask != null) { return; } AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; List<ResourceRequestProto> list = p.getAskList(); this.ask = new ArrayList<ResourceRequest>(); for (ResourceRequestProto c : list) { this.ask.add(convertFromProtoFormat(c)); } }
@Override public synchronized ResourceRequest getResourceRequest() { PreemptionResourceRequestProtoOrBuilder p = viaProto ? proto : builder; if (rr != null) { return rr; } if (!p.hasResource()) { return null; } rr = convertFromProtoFormat(p.getResource()); return rr; }
@Public @Stable public static AllocateRequest newInstance(int responseID, float appProgress, List<ResourceRequest> resourceAsk, List<ContainerId> containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest) { return newInstance(responseID, appProgress, resourceAsk, containersToBeReleased, resourceBlacklistRequest, null); }
@Override public ResourceRequest getAMContainerResourceRequest() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; if (this.amResourceRequest != null) { return amResourceRequest; } // Else via proto if (!p.hasAmContainerResourceRequest()) { return null; } amResourceRequest = convertFromProtoFormat(p.getAmContainerResourceRequest()); return amResourceRequest; }
@Override public void setAMContainerResourceRequest(ResourceRequest request) { maybeInitBuilder(); if (request == null) { builder.clearAmContainerResourceRequest(); } this.amResourceRequest = request; }
public static ResourceRequest newResourceRequest(Priority priority, String hostName, Resource capability, int numContainers) { ResourceRequest request = recordFactory .newRecordInstance(ResourceRequest.class); request.setPriority(priority); request.setResourceName(hostName); request.setCapability(capability); request.setNumContainers(numContainers); return request; }
public static ResourceRequest newResourceRequest(ResourceRequest r) { ResourceRequest request = recordFactory .newRecordInstance(ResourceRequest.class); request.setPriority(r.getPriority()); request.setResourceName(r.getResourceName()); request.setCapability(r.getCapability()); request.setNumContainers(r.getNumContainers()); return request; }
private void applyRequestLimits() { Iterator<ResourceRequest> iter = requestLimits.values().iterator(); while (iter.hasNext()) { ResourceRequest reqLimit = iter.next(); int limit = reqLimit.getNumContainers(); Map<String, Map<Resource, ResourceRequest>> remoteRequests = remoteRequestsTable.get(reqLimit.getPriority()); Map<Resource, ResourceRequest> reqMap = (remoteRequests != null) ? remoteRequests.get(ResourceRequest.ANY) : null; ResourceRequest req = (reqMap != null) ? reqMap.get(reqLimit.getCapability()) : null; if (req == null) { continue; } // update an existing ask or send a new one if updating if (ask.remove(req) || requestLimitsToUpdate.contains(req)) { ResourceRequest newReq = req.getNumContainers() > limit ? reqLimit : req; ask.add(newReq); LOG.info("Applying ask limit of " + newReq.getNumContainers() + " for priority:" + reqLimit.getPriority() + " and capability:" + reqLimit.getCapability()); } if (limit == Integer.MAX_VALUE) { iter.remove(); } } requestLimitsToUpdate.clear(); }
@Override public List<ResourceRequest> getResourceRequests() { try { readLock.lock(); return resourceRequests; } finally { readLock.unlock(); } }
public AllocateResponse allocate( List<ResourceRequest> resourceRequest, List<ContainerId> releases) throws Exception { final AllocateRequest req = AllocateRequest.newInstance(0, 0F, resourceRequest, releases, null); return allocate(req); }
private ResourceRequest createResourceRequest(int memory, String host, int priority, int numContainers) { ResourceRequest request = recordFactory .newRecordInstance(ResourceRequest.class); request.setCapability(Resources.createResource(memory)); request.setResourceName(host); request.setNumContainers(numContainers); Priority prio = recordFactory.newRecordInstance(Priority.class); prio.setPriority(priority); request.setPriority(prio); return request; }
private int getMaxAllocatableContainers(FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, NodeType type) { int maxContainers = 0; ResourceRequest offSwitchRequest = application.getResourceRequest(priority, ResourceRequest.ANY); if (offSwitchRequest != null) { maxContainers = offSwitchRequest.getNumContainers(); } if (type == NodeType.OFF_SWITCH) { return maxContainers; } if (type == NodeType.RACK_LOCAL) { ResourceRequest rackLocalRequest = application.getResourceRequest(priority, node.getRMNode().getRackName()); if (rackLocalRequest == null) { return maxContainers; } maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers()); } if (type == NodeType.NODE_LOCAL) { ResourceRequest nodeLocalRequest = application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); if (nodeLocalRequest != null) { maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers()); } } return maxContainers; }
@Test public void testBlackListNodes() throws Exception { Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); MockRM rm = new MockRM(conf); rm.start(); FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); String host = "127.0.0.1"; RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); fs.handle(new NodeAddedSchedulerEvent(node)); ApplicationId appId = BuilderUtils.newApplicationId(100, 1); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); createMockRMApp(appAttemptId, rm.getRMContext()); SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "default", "user"); fs.handle(appEvent); SchedulerEvent attemptEvent = new AppAttemptAddedSchedulerEvent(appAttemptId, false); fs.handle(attemptEvent); // Verify the blacklist can be updated independent of requesting containers fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), Collections.singletonList(host), null); Assert.assertTrue(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), null, Collections.singletonList(host)); Assert.assertFalse(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); rm.stop(); }
public static ResourceRequest createResourceRequest( String resourceName, int memory, int numContainers, boolean relaxLocality, Priority priority, RecordFactory recordFactory) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); Resource capability = Resources.createResource(memory, 1); request.setNumContainers(numContainers); request.setResourceName(resourceName); request.setCapability(capability); request.setRelaxLocality(relaxLocality); request.setPriority(priority); return request; }
private void verifyResourceRequest( AMRMClientImpl<ContainerRequest> client, ContainerRequest request, String location, boolean expectedRelaxLocality) { ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority()) .get(location).get(request.getCapability()).remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(1, ask.getNumContainers()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); }
protected void setRequestLimit(Priority priority, Resource capability, int limit) { if (limit < 0) { limit = Integer.MAX_VALUE; } ResourceRequest newReqLimit = ResourceRequest.newInstance(priority, ResourceRequest.ANY, capability, limit); ResourceRequest oldReqLimit = requestLimits.put(newReqLimit, newReqLimit); if (oldReqLimit == null || oldReqLimit.getNumContainers() < limit) { requestLimitsToUpdate.add(newReqLimit); } }
/** * send out request for AM container */ protected void requestAMContainer() throws YarnException, IOException, InterruptedException { List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ResourceRequest amRequest = createResourceRequest( BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, MR_AM_CONTAINER_RESOURCE_VCORES), ResourceRequest.ANY, 1, 1); ask.add(amRequest); LOG.debug(MessageFormat.format("Application {0} sends out allocate " + "request for its AM", appId)); final AllocateRequest request = this.createAllocateRequest(ask); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(appAttemptId.toString()); Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps() .get(appAttemptId.getApplicationId()) .getRMAppAttempt(appAttemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); AllocateResponse response = ugi.doAs( new PrivilegedExceptionAction<AllocateResponse>() { @Override public AllocateResponse run() throws Exception { return rm.getApplicationMasterService().allocate(request); } }); if (response != null) { responseQueue.put(response); } }
protected ResourceRequest createResourceRequest( int memory, int vcores, int gcores, String host, int priority, int numContainers, boolean relaxLocality) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(BuilderUtils.newResource(memory, vcores, gcores)); request.setResourceName(host); request.setNumContainers(numContainers); Priority prio = recordFactory.newRecordInstance(Priority.class); prio.setPriority(priority); request.setPriority(prio); request.setRelaxLocality(relaxLocality); request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); return request; }
public List<ResourceRequest> getAllResourceRequests() { List<ResourceRequest> ret = new ArrayList<ResourceRequest>(); for (Map<String, ResourceRequest> r : requests.values()) { ret.addAll(r.values()); } return ret; }
/** * Resources have been allocated to this application by the resource * scheduler. Track them. * * @param type * the type of the node * @param node * the nodeinfo of the node * @param priority * the priority of the request. * @param request * the request * @param container * the containers allocated. */ synchronized public List<ResourceRequest> allocate(NodeType type, SchedulerNode node, Priority priority, ResourceRequest request, Container container) { List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>(); if (type == NodeType.NODE_LOCAL) { allocateNodeLocal(node, priority, request, container, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { allocateRackLocal(node, priority, request, container, resourceRequests); } else { allocateOffSwitch(node, priority, request, container, resourceRequests); } QueueMetrics metrics = queue.getMetrics(); if (pending) { // once an allocation is done we assume the application is // running from scheduler's POV. pending = false; metrics.runAppAttempt(applicationId, user); } if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationId=" + applicationId + " container=" + container.getId() + " host=" + container.getNodeId().toString() + " user=" + user + " resource=" + request.getCapability()); } metrics.allocateResources(user, 1, request.getCapability(), true); return resourceRequests; }
@Test public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{ MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); // request a container. am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>()); ContainerId containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); // acquire the container. SecurityUtilTestHelper.setTokenServiceUseIp(true); List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers(); // not able to fetch the container; Assert.assertEquals(0, containers.size()); SecurityUtilTestHelper.setTokenServiceUseIp(false); containers = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers(); // should be able to fetch the container; Assert.assertEquals(1, containers.size()); }