/** * Utility method to validate a list resource requests, by insuring that the * requested memory/vcore is non-negative and not greater than max */ public static void normalizeAndValidateRequests(List<ResourceRequest> ask, Resource maximumResource, String queueName, YarnScheduler scheduler, RMContext rmContext) throws InvalidResourceRequestException { QueueInfo queueInfo = null; try { queueInfo = scheduler.getQueueInfo(queueName, false, false); } catch (IOException e) { } for (ResourceRequest resReq : ask) { SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler, rmContext, queueInfo); } }
@Override public QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName()); queueInfo.setCapacity(1.0f); if (clusterResource.getMemory() == 0) { queueInfo.setCurrentCapacity(0.0f); } else { queueInfo.setCurrentCapacity((float) usedResource.getMemory() / clusterResource.getMemory()); } queueInfo.setMaximumCapacity(1.0f); queueInfo.setChildQueues(new ArrayList<QueueInfo>()); queueInfo.setQueueState(QueueState.RUNNING); return queueInfo; }
private static void normalizeNodeLabelExpressionInRequest( ResourceRequest resReq, QueueInfo queueInfo) { String labelExp = resReq.getNodeLabelExpression(); // if queue has default label expression, and RR doesn't have, use the // default label expression of queue if (labelExp == null && queueInfo != null && ResourceRequest.ANY .equals(resReq.getResourceName())) { labelExp = queueInfo.getDefaultNodeLabelExpression(); } // If labelExp still equals to null, set it to be NO_LABEL if (labelExp == null) { labelExp = RMNodeLabelsManager.NO_LABEL; } resReq.setNodeLabelExpression(labelExp); }
public static void normalizeAndValidateRequest(ResourceRequest resReq, Resource maximumResource, String queueName, YarnScheduler scheduler, boolean isRecovery, RMContext rmContext, QueueInfo queueInfo) throws InvalidResourceRequestException { if (queueInfo == null) { try { queueInfo = scheduler.getQueueInfo(queueName, false, false); } catch (IOException e) { // it is possible queue cannot get when queue mapping is set, just ignore // the queueInfo here, and move forward } } SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo); if (!isRecovery) { validateResourceRequest(resReq, maximumResource, queueInfo, rmContext); } }
@Override public synchronized QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { QueueInfo queueInfo = getQueueInfo(); List<QueueInfo> childQueuesInfo = new ArrayList<QueueInfo>(); if (includeChildQueues) { for (CSQueue child : childQueues) { // Get queue information recursively? childQueuesInfo.add( child.getQueueInfo(recursive, recursive)); } } queueInfo.setChildQueues(childQueuesInfo); return queueInfo; }
@Test public void testDefaultNodeLabelExpressionQueueConfig() throws Exception { CapacityScheduler cs = new CapacityScheduler(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); conf.setDefaultNodeLabelExpression("root.a", " x"); conf.setDefaultNodeLabelExpression("root.b", " y "); cs.setConf(new YarnConfiguration()); cs.setRMContext(resourceManager.getRMContext()); cs.init(conf); cs.start(); QueueInfo queueInfoA = cs.getQueueInfo("a", true, false); Assert.assertEquals(queueInfoA.getQueueName(), "a"); Assert.assertEquals(queueInfoA.getDefaultNodeLabelExpression(), "x"); QueueInfo queueInfoB = cs.getQueueInfo("b", true, false); Assert.assertEquals(queueInfoB.getQueueName(), "b"); Assert.assertEquals(queueInfoB.getDefaultNodeLabelExpression(), "y"); }
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) throws IOException { Dispatcher dispatcher = mock(Dispatcher.class); when(rmContext.getDispatcher()).thenReturn(dispatcher); EventHandler eventHandler = mock(EventHandler.class); when(dispatcher.getEventHandler()).thenReturn(eventHandler); QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class); queInfo.setQueueName("testqueue"); when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean())) .thenReturn(queInfo); when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean())) .thenThrow(new IOException("queue does not exist")); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, yarnScheduler); when(rmContext.getRMApps()).thenReturn(apps); when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn( getSchedulerApps(apps)); ResourceScheduler rs = mock(ResourceScheduler.class); when(rmContext.getScheduler()).thenReturn(rs); }
/** * Lists the Queue Information matching the given queue name * * @param queueName * @throws YarnException * @throws IOException */ private int listQueue(String queueName) throws YarnException, IOException { int rc; PrintWriter writer = new PrintWriter( new OutputStreamWriter(sysout, Charset.forName("UTF-8"))); QueueInfo queueInfo = client.getQueueInfo(queueName); if (queueInfo != null) { writer.println("Queue Information : "); printQueueInfo(writer, queueInfo); rc = 0; } else { writer.println("Cannot get queue from RM by queueName = " + queueName + ", please check."); rc = -1; } writer.flush(); return rc; }
@Test public void testGetQueueInfo() throws Exception { QueueCLI cli = createAndGetQueueCLI(); Set<String> nodeLabels = new HashSet<String>(); nodeLabels.add("GPU"); nodeLabels.add("JDK_7"); QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f, null, null, QueueState.RUNNING, nodeLabels, "GPU"); when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo); int result = cli.run(new String[] { "-status", "queueA" }); assertEquals(0, result); verify(client).getQueueInfo("queueA"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter pw = new PrintWriter(baos); pw.println("Queue Information : "); pw.println("Queue Name : " + "queueA"); pw.println("\tState : " + "RUNNING"); pw.println("\tCapacity : " + "40.0%"); pw.println("\tCurrent Capacity : " + "50.0%"); pw.println("\tMaximum Capacity : " + "80.0%"); pw.println("\tDefault Node Label expression : " + "GPU"); pw.println("\tAccessible Node Labels : " + "JDK_7,GPU"); pw.close(); String queueInfoStr = baos.toString("UTF-8"); Assert.assertEquals(queueInfoStr, sysOutStream.toString()); }
@Test public void testGetQueueInfoWithEmptyNodeLabel() throws Exception { QueueCLI cli = createAndGetQueueCLI(); QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f, null, null, QueueState.RUNNING, null, null); when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo); int result = cli.run(new String[] { "-status", "queueA" }); assertEquals(0, result); verify(client).getQueueInfo("queueA"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter pw = new PrintWriter(baos); pw.println("Queue Information : "); pw.println("Queue Name : " + "queueA"); pw.println("\tState : " + "RUNNING"); pw.println("\tCapacity : " + "40.0%"); pw.println("\tCurrent Capacity : " + "50.0%"); pw.println("\tMaximum Capacity : " + "80.0%"); pw.println("\tDefault Node Label expression : "); pw.println("\tAccessible Node Labels : "); pw.close(); String queueInfoStr = baos.toString("UTF-8"); Assert.assertEquals(queueInfoStr, sysOutStream.toString()); }
@Override public void trackQueue(String queueName) { trackedQueues.add(queueName); FifoScheduler fifo = (FifoScheduler) scheduler; // for FifoScheduler, only DEFAULT_QUEUE // here the three parameters doesn't affect results final QueueInfo queue = fifo.getQueueInfo(queueName, false, false); // track currentCapacity, maximumCapacity (always 1.0f) metrics.register("variable.queue." + queueName + ".currentcapacity", new Gauge<Float>() { @Override public Float getValue() { return queue.getCurrentCapacity(); } } ); metrics.register("variable.queue." + queueName + ".", new Gauge<Float>() { @Override public Float getValue() { return queue.getCurrentCapacity(); } } ); }
/** * Utility method to validate a list resource requests, by insuring that the * requested memory/vcore is non-negative and not greater than max */ public static void normalizeAndValidateRequests(List<ResourceRequest> ask, Resource maximumResource, String queueName, YarnScheduler scheduler, RMContext rmContext) throws InvalidResourceRequestException { // Get queue from scheduler QueueInfo queueInfo = null; try { queueInfo = scheduler.getQueueInfo(queueName, false, false); } catch (IOException e) { } for (ResourceRequest resReq : ask) { SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler, rmContext, queueInfo); } }
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) throws IOException { Dispatcher dispatcher = mock(Dispatcher.class); when(rmContext.getDispatcher()).thenReturn(dispatcher); EventHandler eventHandler = mock(EventHandler.class); when(dispatcher.getEventHandler()).thenReturn(eventHandler); QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class); queInfo.setQueueName("testqueue"); when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean())) .thenReturn(queInfo); when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean())) .thenThrow(new IOException("queue does not exist")); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, yarnScheduler); when(rmContext.getRMApps()).thenReturn(apps); when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn( getSchedulerApps(apps)); ResourceScheduler rs = mock(ResourceScheduler.class); when(rmContext.getScheduler()).thenReturn(rs); }
@Test public void testGetQueueInfoWithEmptyNodeLabel() throws Exception { QueueCLI cli = createAndGetQueueCLI(); QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f, null, null, QueueState.RUNNING, null, null, null, true); when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo); int result = cli.run(new String[] { "-status", "queueA" }); assertEquals(0, result); verify(client).getQueueInfo("queueA"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter pw = new PrintWriter(baos); pw.println("Queue Information : "); pw.println("Queue Name : " + "queueA"); pw.println("\tState : " + "RUNNING"); pw.println("\tCapacity : " + "40.0%"); pw.println("\tCurrent Capacity : " + "50.0%"); pw.println("\tMaximum Capacity : " + "80.0%"); pw.println("\tDefault Node Label expression : " + NodeLabel.DEFAULT_NODE_LABEL_PARTITION); pw.println("\tAccessible Node Labels : "); pw.println("\tPreemption : " + "disabled"); pw.close(); String queueInfoStr = baos.toString("UTF-8"); Assert.assertEquals(queueInfoStr, sysOutStream.toString()); }
@Override public QueueInfo getQueueInfo() { if (this.queueInfo != null) { return this.queueInfo; } GetQueueInfoResponseProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasQueueInfo()) { return null; } this.queueInfo = convertFromProtoFormat(p.getQueueInfo()); return this.queueInfo; }
@Override public void setQueueInfo(QueueInfo queueInfo) { maybeInitBuilder(); if(queueInfo == null) { builder.clearQueueInfo(); } this.queueInfo = queueInfo; }
@Override public void setChildQueues(List<QueueInfo> childQueues) { if (childQueues == null) { builder.clearChildQueues(); } this.childQueuesList = childQueues; }
private void initLocalChildQueuesList() { if (this.childQueuesList != null) { return; } QueueInfoProtoOrBuilder p = viaProto ? proto : builder; List<QueueInfoProto> list = p.getChildQueuesList(); childQueuesList = new ArrayList<QueueInfo>(); for (QueueInfoProto a : list) { childQueuesList.add(convertFromProtoFormat(a)); } }
private void addChildQueuesInfoToProto() { maybeInitBuilder(); builder.clearChildQueues(); if (childQueuesList == null) return; Iterable<QueueInfoProto> iterable = new Iterable<QueueInfoProto>() { @Override public Iterator<QueueInfoProto> iterator() { return new Iterator<QueueInfoProto>() { Iterator<QueueInfo> iter = childQueuesList.iterator(); @Override public boolean hasNext() { return iter.hasNext(); } @Override public QueueInfoProto next() { return convertToProtoFormat(iter.next()); } @Override public void remove() { throw new UnsupportedOperationException(); } }; } }; builder.addAllChildQueues(iterable); }
public FifoSchedulerInfo(final ResourceManager rm) { RMContext rmContext = rm.getRMContext(); FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); qName = fs.getQueueInfo("", false, false).getQueueName(); QueueInfo qInfo = fs.getQueueInfo(qName, true, true); this.usedCapacity = qInfo.getCurrentCapacity(); this.capacity = qInfo.getCapacity(); this.minQueueMemoryCapacity = fs.getMinimumResourceCapability().getMemory(); this.maxQueueMemoryCapacity = fs.getMaximumResourceCapability().getMemory(); this.qstate = qInfo.getQueueState(); this.numNodes = rmContext.getRMNodes().size(); this.usedNodeCapacity = 0; this.availNodeCapacity = 0; this.totalNodeCapacity = 0; this.numContainers = 0; for (RMNode ni : rmContext.getRMNodes().values()) { SchedulerNodeReport report = fs.getNodeReport(ni.getNodeID()); this.usedNodeCapacity += report.getUsedResource().getMemory(); this.availNodeCapacity += report.getAvailableResource().getMemory(); this.totalNodeCapacity += ni.getTotalCapability().getMemory(); this.numContainers += fs.getNodeReport(ni.getNodeID()).getNumContainers(); } }
@Override public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) { QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(getQueueName()); if (scheduler.getClusterResource().getMemory() == 0) { queueInfo.setCapacity(0.0f); } else { queueInfo.setCapacity((float) getFairShare().getMemory() / scheduler.getClusterResource().getMemory()); } if (getFairShare().getMemory() == 0) { queueInfo.setCurrentCapacity(0.0f); } else { queueInfo.setCurrentCapacity((float) getResourceUsage().getMemory() / getFairShare().getMemory()); } ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>(); if (includeChildQueues) { Collection<FSQueue> childQueues = getChildQueues(); for (FSQueue child : childQueues) { childQueueInfos.add(child.getQueueInfo(recursive, recursive)); } } queueInfo.setChildQueues(childQueueInfos); queueInfo.setQueueState(QueueState.RUNNING); return queueInfo; }
@Override public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, boolean recursive) throws IOException { if (!queueMgr.exists(queueName)) { throw new IOException("queue " + queueName + " does not exist"); } return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues, recursive); }
public static void normalizeAndvalidateRequest(ResourceRequest resReq, Resource maximumResource, String queueName, YarnScheduler scheduler, RMContext rmContext, QueueInfo queueInfo) throws InvalidResourceRequestException { normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler, false, rmContext, queueInfo); }
protected QueueInfo getQueueInfo() { QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(queueName); queueInfo.setAccessibleNodeLabels(accessibleLabels); queueInfo.setCapacity(queueCapacities.getCapacity()); queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity()); queueInfo.setQueueState(state); queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setCurrentCapacity(getUsedCapacity()); return queueInfo; }