private List<Queue> createQueues(Configuration conf) { String[] queueNameValues = conf.getStrings( MAPRED_QUEUE_NAMES_KEY); List<Queue> list = new ArrayList<Queue>(); for (String name : queueNameValues) { try { Map<String, AccessControlList> acls = getQueueAcls( name, conf); QueueState state = getQueueState(name, conf); Queue q = new Queue(name, acls, state); list.add(q); } catch (Throwable t) { LOG.warn("Not able to initialize queue " + name); } } return list; }
/** * @return a simple hierarchy of JobQueueInfos */ static JobQueueInfo[] getSimpleQueueHierarchy() { int numQs = 3; JobQueueInfo[] queues = new JobQueueInfo[numQs]; queues[0] = newJobQueueInfo(new ArrayList<JobQueueInfo>(), null, "q1", QueueState.UNDEFINED, null); queues[1] = newJobQueueInfo(new ArrayList<JobQueueInfo>(), null, "q1:q2", QueueState.RUNNING, null); queues[2] = newJobQueueInfo(new ArrayList<JobQueueInfo>(), null, "q1:q3", QueueState.RUNNING, null); queues[0].addChild(queues[1]); queues[0].addChild(queues[2]); return queues; }
/** * Test to verify that the refresh of queue properties fails if a new queue is * added. * * @throws Exception */ @Test public void testRefreshWithAddedQueues() throws Exception { JobQueueInfo[] queues = getSimpleQueueHierarchy(); // write the configuration file writeQueueConfigurationFile( QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] }); QueueManager qManager = new QueueManager(); JobQueueInfo newQueue = newJobQueueInfo(new ArrayList<JobQueueInfo>(), null, "q4", QueueState.UNDEFINED, null); queues[0].addChild(newQueue); // Rewrite the configuration file writeQueueConfigurationFile( QUEUES_CONFIG_FILE_PATH, new JobQueueInfo[] { queues[0] }); testRefreshFailureWithChangeOfHierarchy(qManager); }
/** * Creates all given queues as 1st level queues(no nesting) * @param doc the queues config document * @param queueNames the queues to be added to the queues config document * @param submitAcls acl-submit-job acls for each of the queues * @param adminsAcls acl-administer-jobs acls for each of the queues * @throws Exception */ public static void createSimpleDocument(Document doc, String[] queueNames, String[] submitAcls, String[] adminsAcls) throws Exception { Element queues = createQueuesNode(doc); // Create all queues as 1st level queues(no nesting) for (int i = 0; i < queueNames.length; i++) { Element q = createQueue(doc, queueNames[i]); q.appendChild(createState(doc, QueueState.RUNNING.getStateName())); q.appendChild(createAcls(doc, QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, submitAcls[i])); q.appendChild(createAcls(doc, QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, adminsAcls[i])); queues.appendChild(q); } }
void setQueues(Set<String> queueNames) { this.queueNames = queueNames; // sync up queues with the parent class. Queue[] queues = new Queue[queueNames.size()]; int i = 0; for (String queueName : queueNames) { HashMap<String, AccessControlList> aclsMap = new HashMap<String, AccessControlList>(); for (QueueACL qAcl : QueueACL.values()) { String key = toFullPropertyName(queueName, qAcl.getAclName()); aclsMap.put(key, allEnabledAcl); } queues[i++] = new Queue(queueName, aclsMap, QueueState.RUNNING); } super.setQueues(queues); }
/** * Checks whether the given queue is running or not. * * @param queueName name of the queue * @return true, if the queue is running. */ synchronized boolean isRunning(String queueName) { Queue q = leafQueues.get(queueName); if (q != null) { return q.getState().equals(QueueState.RUNNING); } return false; }
/** * Only applicable to leaf level queues * Parse ACLs for the queue from the configuration. */ private QueueState getQueueState(String name, Configuration conf) { String stateVal = conf.get( toFullPropertyName(name, "state"), QueueState.RUNNING.getStateName()); return QueueState.getState(stateVal); }
static JobQueueInfo newJobQueueInfo(List<JobQueueInfo> children, Properties props, String queueName, QueueState state, String schedulingInfo) { JobQueueInfo jqi = new JobQueueInfo(); jqi.setChildren(children); if (props != null) { jqi.setProperties(props); } jqi.setQueueName(queueName); jqi.setQueueState(state.getStateName()); jqi.setSchedulingInfo(schedulingInfo); return jqi; }
public static void createSimpleDocument(Document doc) throws Exception { Element queues = createQueuesNode(doc); // Create parent level queue q1. Element q1 = createQueue(doc, "q1"); Properties props = new Properties(); props.setProperty("capacity", "10"); props.setProperty("maxCapacity", "35"); q1.appendChild(createProperties(doc, props)); queues.appendChild(q1); // Create another parent level p1 Element p1 = createQueue(doc, "p1"); // append child p11 to p1 p1.appendChild(createQueue(doc, "p11")); Element p12 = createQueue(doc, "p12"); p12.appendChild(createState(doc, QueueState.STOPPED.getStateName())); p12.appendChild(createAcls(doc, QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1")); p12.appendChild(createAcls(doc, QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2")); // append p12 to p1. p1.appendChild(p12); queues.appendChild(p1); }
public static void refreshSimpleDocument(Document doc) throws Exception { Element queues = createQueuesNode(doc); // Create parent level queue q1. Element q1 = createQueue(doc, "q1"); Properties props = new Properties(); props.setProperty("capacity", "70"); props.setProperty("maxCapacity", "35"); q1.appendChild(createProperties(doc, props)); queues.appendChild(q1); // Create another parent level p1 Element p1 = createQueue(doc, "p1"); // append child p11 to p1 Element p11 = createQueue(doc, "p11"); p11.appendChild(createState(doc, QueueState.STOPPED.getStateName())); p1.appendChild(p11); Element p12 = createQueue(doc, "p12"); p12.appendChild(createState(doc, QueueState.RUNNING.getStateName())); p12.appendChild(createAcls(doc, "acl-submit-job", "u3")); p12.appendChild(createAcls(doc, "acl-administer-jobs", "u4")); // append p12 to p1. p1.appendChild(p12); queues.appendChild(p1); }