Java 类org.apache.hadoop.mapred.FairScheduler.JobInfo 实例源码
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test exercises delay scheduling at the node level. We submit a job
* with data on rack1.node2 and check that it doesn't get assigned on earlier
* nodes. A second job with no locality info should get assigned instead.
*
* TaskTracker names in this test map to nodes as follows:
* - tt1 = rack1.node1
* - tt2 = rack1.node2
* - tt3 = rack2.node1
* - tt4 = rack2.node2
*/
public void testDelaySchedulingAtNodeLevel() throws IOException {
setUpCluster(2, 2, true);
scheduler.assignMultiple = true;
JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
new String[][] {
{"rack2.node2"}
});
JobInfo info1 = scheduler.infos.get(job1);
// Advance time before submitting another job j2, to make j1 be ahead
// of j2 in the queue deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);
// Assign tasks on nodes 1-3 and check that j2 gets them
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
"attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
"attempt_test_0002_m_000003_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
"attempt_test_0002_m_000005_0 on tt3");
// Assign a task on node 4 now and check that j1 gets it. The other slot
// on the node should be given to j2 because j1 will be out of tasks.
checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
"attempt_test_0002_m_000006_0 on tt4");
// Check that delay scheduling info is properly set
assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
assertEquals(info1.timeWaitedForLocalMap, 0);
assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* We submit two jobs at interval of 200 such that job2 has 2x the priority
* of the job1, then wait for 100 ms, and check that all slots are assigned
* to job 1 even though job 2 has higher priority and fair scheduler would
* have allocated atleast a few slots to job 2
*/
public void testFifoJobScheduler() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<minMaps>2</minMaps>");
out.println("<minReduces>2</minReduces>");
// enable fifo
out.println("<fifo>true</fifo>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
JobInfo info1 = scheduler.infos.get(job1);
// Advance time 200ms and submit job 2
advanceTime(200);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
JobInfo info2 = scheduler.infos.get(job2);
job2.setPriority(JobPriority.HIGH);
// Advance time 100ms
advanceTime(100);
// Assign tasks and check that all slots are given to job1
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* This test configures a pool pool_a, and redirects the default to it.
*/
public void testPoolRedirect() throws Exception {
// Set up pools file
// pool_a has 0 totalInitedTasks, default does not have that restriction.
// The redirect from default -> pool_a should enforce 0 total inited tasks.
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"default\">");
out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>");
out.println("<redirect>pool_a</redirect>");
out.println("</pool>");
out.println("<pool name=\"pool_a\">");
out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit a job.
JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(10);
Thread.sleep(1000L); // Let JobInitializaer to finish the work
// Should have gone to pool_a, not default
assertEquals(info1.poolName, "pool_a");
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* This test submits jobs in two pools, pool_a and pool_b. None of the
* jobs in pool_a have maps, but this should not affect their reduce
* share.
*/
public void testPoolWeightsWhenNoMaps() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<weight>2.0</weight>");
out.println("</pool>");
out.println("<pool name=\"pool_b\">");
out.println("<weight>1.0</weight>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a");
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a");
JobInfo info2 = scheduler.infos.get(job2);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_b");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
assertEquals(0, info1.mapWeight, 0.01);
assertEquals(1.0, info1.reduceWeight, 0.01);
assertEquals(0, info2.mapWeight, 0.01);
assertEquals(1.0, info2.reduceWeight, 0.01);
assertEquals(1.0, info3.mapWeight, 0.01);
assertEquals(1.0, info3.reduceWeight, 0.01);
assertEquals(0, info1.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info1.reduceFairShare, ALLOW_ERROR);
assertEquals(0, info2.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info2.reduceFairShare, ALLOW_ERROR);
assertEquals(4, info3.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info3.reduceFairShare, ALLOW_ERROR);
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* Verify the FIFO pool weight adjust
*/
public void testPoolFifoWeight() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<fifo>true</fifo>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
advanceTime(1L);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
advanceTime(2L);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
advanceTime(3L);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
scheduler.update();
JobInfo info1 = scheduler.infos.get(job1);
JobInfo info2 = scheduler.infos.get(job2);
JobInfo info3 = scheduler.infos.get(job3);
JobInfo info4 = scheduler.infos.get(job4);
final double ALLOWED_ERROR = 0.00001;
assertEquals(8.0 / 15, info1.mapWeight, ALLOWED_ERROR);
assertEquals(4.0 / 15, info2.mapWeight, ALLOWED_ERROR);
assertEquals(2.0 / 15, info3.mapWeight, ALLOWED_ERROR);
assertEquals(1.0 / 15, info4.mapWeight, ALLOWED_ERROR);
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* Verify the min slots of FIFO pools
*/
public void testPoolFifoMin() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<fifo>true</fifo>");
out.println("<minMaps>12</minMaps>");
out.println("<minReduces>12</minReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(1L);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(2L);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(3L);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
scheduler.update();
JobInfo info1 = scheduler.infos.get(job1);
JobInfo info2 = scheduler.infos.get(job2);
JobInfo info3 = scheduler.infos.get(job3);
JobInfo info4 = scheduler.infos.get(job4);
assertEquals(5, info1.minMaps);
assertEquals(5, info2.minMaps);
assertEquals(2, info3.minMaps);
assertEquals(0, info4.minMaps);
assertEquals(5, info1.minReduces);
assertEquals(5, info2.minReduces);
assertEquals(2, info3.minReduces);
assertEquals(0, info4.minReduces);
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* Verify the max slots of FIFO pools
*/
public void testPoolFifoMax() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<fifo>true</fifo>");
out.println("<maxMaps>12</maxMaps>");
out.println("<maxReduces>12</maxReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(1L);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(2L);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(3L);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
scheduler.update();
JobInfo info1 = scheduler.infos.get(job1);
JobInfo info2 = scheduler.infos.get(job2);
JobInfo info3 = scheduler.infos.get(job3);
JobInfo info4 = scheduler.infos.get(job4);
assertEquals(5, info1.maxMaps);
assertEquals(5, info2.maxMaps);
assertEquals(2, info3.maxMaps);
assertEquals(0, info4.maxMaps);
assertEquals(5, info1.maxReduces);
assertEquals(5, info2.maxReduces);
assertEquals(2, info3.maxReduces);
assertEquals(0, info4.maxReduces);
}
项目:hadoop-on-lustre
文件:TestFairScheduler.java
/**
* This test exercises delay scheduling at the node level. We submit a job
* with data on rack1.node2 and check that it doesn't get assigned on earlier
* nodes. A second job with no locality info should get assigned instead.
*
* TaskTracker names in this test map to nodes as follows:
* - tt1 = rack1.node1
* - tt2 = rack1.node2
* - tt3 = rack2.node1
* - tt4 = rack2.node2
*/
public void testDelaySchedulingAtNodeLevel() throws IOException {
setUpCluster(2, 2, true);
scheduler.assignMultiple = true;
JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
new String[][] {
{"rack2.node2"}
}, true);
JobInfo info1 = scheduler.infos.get(job1);
// Advance time before submitting another job j2, to make j1 be ahead
// of j2 in the queue deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);
// Assign tasks on nodes 1-3 and check that j2 gets them
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
"attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
"attempt_test_0002_m_000003_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
"attempt_test_0002_m_000005_0 on tt3");
// Assign a task on node 4 now and check that j1 gets it. The other slot
// on the node should be given to j2 because j1 will be out of tasks.
checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
"attempt_test_0002_m_000006_0 on tt4");
// Check that delay scheduling info is properly set
assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
assertEquals(info1.timeWaitedForLocalMap, 0);
assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:RDFS
文件:TestFairScheduler.java
/**
* We submit two jobs at interval of 200 such that job2 has 2x the priority
* of the job1, then wait for 100 ms, and check that all slots are assigned
* to job 1 even though job 2 has higher priority and fair scheduler would
* have allocated atleast a few slots to job 2
*/
public void testFifoJobScheduler() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<minMaps>2</minMaps>");
out.println("<minReduces>2</minReduces>");
// enable fifo
out.println("<fifo>true</fifo>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
JobInfo info1 = scheduler.infos.get(job1);
// Advance time 200ms and submit job 2
advanceTime(200);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
JobInfo info2 = scheduler.infos.get(job2);
job2.setPriority(JobPriority.HIGH);
// Advance time 100ms
advanceTime(100);
// Assign tasks and check that all slots are given to job1
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
}
项目:RDFS
文件:TestFairScheduler.java
/**
* This test configures a pool pool_a, and redirects the default to it.
*/
public void testPoolRedirect() throws Exception {
// Set up pools file
// pool_a has 0 totalInitedTasks, default does not have that restriction.
// The redirect from default -> pool_a should enforce 0 total inited tasks.
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"default\">");
out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>");
out.println("<redirect>pool_a</redirect>");
out.println("</pool>");
out.println("<pool name=\"pool_a\">");
out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit a job.
JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(10);
Thread.sleep(1000L); // Let JobInitializaer to finish the work
// Should have gone to pool_a, not default
assertEquals(info1.poolName, "pool_a");
}
项目:RDFS
文件:TestFairScheduler.java
/**
* This test submits jobs in two pools, pool_a and pool_b. None of the
* jobs in pool_a have maps, but this should not affect their reduce
* share.
*/
public void testPoolWeightsWhenNoMaps() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<weight>2.0</weight>");
out.println("</pool>");
out.println("<pool name=\"pool_b\">");
out.println("<weight>1.0</weight>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a");
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a");
JobInfo info2 = scheduler.infos.get(job2);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_b");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
assertEquals(0, info1.mapWeight, 0.01);
assertEquals(1.0, info1.reduceWeight, 0.01);
assertEquals(0, info2.mapWeight, 0.01);
assertEquals(1.0, info2.reduceWeight, 0.01);
assertEquals(1.0, info3.mapWeight, 0.01);
assertEquals(1.0, info3.reduceWeight, 0.01);
assertEquals(0, info1.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info1.reduceFairShare, ALLOW_ERROR);
assertEquals(0, info2.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info2.reduceFairShare, ALLOW_ERROR);
assertEquals(4, info3.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info3.reduceFairShare, ALLOW_ERROR);
}
项目:RDFS
文件:TestFairScheduler.java
/**
* Verify the FIFO pool weight adjust
*/
public void testPoolFifoWeight() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<fifo>true</fifo>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
advanceTime(1L);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
advanceTime(2L);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
advanceTime(3L);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a");
scheduler.update();
JobInfo info1 = scheduler.infos.get(job1);
JobInfo info2 = scheduler.infos.get(job2);
JobInfo info3 = scheduler.infos.get(job3);
JobInfo info4 = scheduler.infos.get(job4);
final double ALLOWED_ERROR = 0.00001;
assertEquals(8.0 / 15, info1.mapWeight, ALLOWED_ERROR);
assertEquals(4.0 / 15, info2.mapWeight, ALLOWED_ERROR);
assertEquals(2.0 / 15, info3.mapWeight, ALLOWED_ERROR);
assertEquals(1.0 / 15, info4.mapWeight, ALLOWED_ERROR);
}
项目:RDFS
文件:TestFairScheduler.java
/**
* Verify the min slots of FIFO pools
*/
public void testPoolFifoMin() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<fifo>true</fifo>");
out.println("<minMaps>12</minMaps>");
out.println("<minReduces>12</minReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(1L);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(2L);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(3L);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
scheduler.update();
JobInfo info1 = scheduler.infos.get(job1);
JobInfo info2 = scheduler.infos.get(job2);
JobInfo info3 = scheduler.infos.get(job3);
JobInfo info4 = scheduler.infos.get(job4);
assertEquals(5, info1.minMaps);
assertEquals(5, info2.minMaps);
assertEquals(2, info3.minMaps);
assertEquals(0, info4.minMaps);
assertEquals(5, info1.minReduces);
assertEquals(5, info2.minReduces);
assertEquals(2, info3.minReduces);
assertEquals(0, info4.minReduces);
}
项目:RDFS
文件:TestFairScheduler.java
/**
* Verify the max slots of FIFO pools
*/
public void testPoolFifoMax() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"pool_a\">");
out.println("<fifo>true</fifo>");
out.println("<maxMaps>12</maxMaps>");
out.println("<maxReduces>12</maxReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(1L);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(2L);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
advanceTime(3L);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a");
scheduler.update();
JobInfo info1 = scheduler.infos.get(job1);
JobInfo info2 = scheduler.infos.get(job2);
JobInfo info3 = scheduler.infos.get(job3);
JobInfo info4 = scheduler.infos.get(job4);
assertEquals(5, info1.maxMaps);
assertEquals(5, info2.maxMaps);
assertEquals(2, info3.maxMaps);
assertEquals(0, info4.maxMaps);
assertEquals(5, info1.maxReduces);
assertEquals(5, info2.maxReduces);
assertEquals(2, info3.maxReduces);
assertEquals(0, info4.maxReduces);
}
项目:hadoop-0.20
文件:TestFairScheduler.java
/**
* This test submits jobs in two pools, poolA and poolB. None of the
* jobs in poolA have maps, but this should not affect their reduce
* share.
*/
public void testPoolWeightsWhenNoMaps() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"poolA\">");
out.println("<weight>2.0</weight>");
out.println("</pool>");
out.println("<pool name=\"poolB\">");
out.println("<weight>1.0</weight>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
assertEquals(0, info1.mapWeight, 0.01);
assertEquals(1.0, info1.reduceWeight, 0.01);
assertEquals(0, info2.mapWeight, 0.01);
assertEquals(1.0, info2.reduceWeight, 0.01);
assertEquals(1.0, info3.mapWeight, 0.01);
assertEquals(1.0, info3.reduceWeight, 0.01);
assertEquals(0, info1.mapFairShare, 0.01);
assertEquals(1.33, info1.reduceFairShare, 0.01);
assertEquals(0, info2.mapFairShare, 0.01);
assertEquals(1.33, info2.reduceFairShare, 0.01);
assertEquals(4, info3.mapFairShare, 0.01);
assertEquals(1.33, info3.reduceFairShare, 0.01);
}
项目:hanoi-hadoop-2.0.0-cdh
文件:TestFairScheduler.java
/**
* This test exercises delay scheduling at the node level. We submit a job
* with data on rack1.node2 and check that it doesn't get assigned on earlier
* nodes. A second job with no locality info should get assigned instead.
*
* TaskTracker names in this test map to nodes as follows:
* - tt1 = rack1.node1
* - tt2 = rack1.node2
* - tt3 = rack2.node1
* - tt4 = rack2.node2
*/
public void testDelaySchedulingAtNodeLevel() throws IOException {
setUpCluster(2, 2, true);
scheduler.assignMultiple = true;
JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
new String[][] {
{"rack2.node2"}
});
JobInfo info1 = scheduler.infos.get(job1);
// Advance time before submitting another job j2, to make j1 be ahead
// of j2 in the queue deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);
// Assign tasks on nodes 1-3 and check that j2 gets them
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
"attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
"attempt_test_0002_m_000003_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
"attempt_test_0002_m_000005_0 on tt3");
// Assign a task on node 4 now and check that j1 gets it. The other slot
// on the node should be given to j2 because j1 will be out of tasks.
checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
"attempt_test_0002_m_000006_0 on tt4");
// Check that delay scheduling info is properly set
assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
assertEquals(info1.timeWaitedForLocalMap, 0);
assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:mapreduce-fork
文件:TestFairScheduler.java
/**
* This test exercises delay scheduling at the node level. We submit a job
* with data on rack1.node2 and check that it doesn't get assigned on earlier
* nodes. A second job with no locality info should get assigned instead.
*
* TaskTracker names in this test map to nodes as follows:
* - tt1 = rack1.node1
* - tt2 = rack1.node2
* - tt3 = rack2.node1
* - tt4 = rack2.node2
*/
public void testDelaySchedulingAtNodeLevel() throws IOException {
setUpCluster(2, 2, true);
scheduler.assignMultiple = true;
JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
new String[][] {
{"rack2.node2"}
}, true);
JobInfo info1 = scheduler.infos.get(job1);
// Advance time before submitting another job j2, to make j1 be ahead
// of j2 in the queue deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);
// Assign tasks on nodes 1-3 and check that j2 gets them
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
"attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
"attempt_test_0002_m_000003_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
"attempt_test_0002_m_000005_0 on tt3");
// Assign a task on node 4 now and check that j1 gets it. The other slot
// on the node should be given to j2 because j1 will be out of tasks.
checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
"attempt_test_0002_m_000006_0 on tt4");
// Check that delay scheduling info is properly set
assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
assertEquals(info1.timeWaitedForLocalMap, 0);
assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:hortonworks-extension
文件:TestFairScheduler.java
/**
* This test exercises delay scheduling at the node level. We submit a job
* with data on rack1.node2 and check that it doesn't get assigned on earlier
* nodes. A second job with no locality info should get assigned instead.
*
* TaskTracker names in this test map to nodes as follows:
* - tt1 = rack1.node1
* - tt2 = rack1.node2
* - tt3 = rack2.node1
* - tt4 = rack2.node2
*/
public void testDelaySchedulingAtNodeLevel() throws IOException {
setUpCluster(2, 2, true);
scheduler.assignMultiple = true;
JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
new String[][] {
{"rack2.node2"}
}, true);
JobInfo info1 = scheduler.infos.get(job1);
// Advance time before submitting another job j2, to make j1 be ahead
// of j2 in the queue deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);
// Assign tasks on nodes 1-3 and check that j2 gets them
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
"attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
"attempt_test_0002_m_000003_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
"attempt_test_0002_m_000005_0 on tt3");
// Assign a task on node 4 now and check that j1 gets it. The other slot
// on the node should be given to j2 because j1 will be out of tasks.
checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
"attempt_test_0002_m_000006_0 on tt4");
// Check that delay scheduling info is properly set
assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
assertEquals(info1.timeWaitedForLocalMap, 0);
assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:hortonworks-extension
文件:TestFairScheduler.java
/**
* This test exercises delay scheduling at the node level. We submit a job
* with data on rack1.node2 and check that it doesn't get assigned on earlier
* nodes. A second job with no locality info should get assigned instead.
*
* TaskTracker names in this test map to nodes as follows:
* - tt1 = rack1.node1
* - tt2 = rack1.node2
* - tt3 = rack2.node1
* - tt4 = rack2.node2
*/
public void testDelaySchedulingAtNodeLevel() throws IOException {
setUpCluster(2, 2, true);
scheduler.assignMultiple = true;
JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
new String[][] {
{"rack2.node2"}
}, true);
JobInfo info1 = scheduler.infos.get(job1);
// Advance time before submitting another job j2, to make j1 be ahead
// of j2 in the queue deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);
// Assign tasks on nodes 1-3 and check that j2 gets them
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
"attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
"attempt_test_0002_m_000003_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
"attempt_test_0002_m_000005_0 on tt3");
// Assign a task on node 4 now and check that j1 gets it. The other slot
// on the node should be given to j2 because j1 will be out of tasks.
checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
"attempt_test_0002_m_000006_0 on tt4");
// Check that delay scheduling info is properly set
assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
assertEquals(info1.timeWaitedForLocalMap, 0);
assertEquals(info1.skippedAtLastHeartbeat, false);
}
项目:hadoop-gpu
文件:TestFairScheduler.java
/**
* This test submits jobs in two pools, poolA and poolB. None of the
* jobs in poolA have maps, but this should not affect their reduce
* share.
*/
public void testPoolWeightsWhenNoMaps() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"poolA\">");
out.println("<weight>2.0</weight>");
out.println("</pool>");
out.println("<pool name=\"poolB\">");
out.println("<weight>1.0</weight>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
assertEquals(0, info1.mapWeight, 0.01);
assertEquals(1.0, info1.reduceWeight, 0.01);
assertEquals(0, info2.mapWeight, 0.01);
assertEquals(1.0, info2.reduceWeight, 0.01);
assertEquals(1.0, info3.mapWeight, 0.01);
assertEquals(1.0, info3.reduceWeight, 0.01);
assertEquals(0, info1.mapFairShare, 0.01);
assertEquals(1.33, info1.reduceFairShare, 0.01);
assertEquals(0, info2.mapFairShare, 0.01);
assertEquals(1.33, info2.reduceFairShare, 0.01);
assertEquals(4, info3.mapFairShare, 0.01);
assertEquals(1.33, info3.reduceFairShare, 0.01);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:JobSchedulable.java
private boolean isRunnable() {
JobInfo info = scheduler.getJobInfo(job);
int runState = job.getStatus().getRunState();
return (info != null && info.runnable && runState == JobStatus.RUNNING);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:PoolSchedulable.java
public void addJob(JobInProgress job) {
JobInfo info = scheduler.getJobInfo(job);
jobScheds.add(taskType == TaskType.MAP ?
info.mapSchedulable : info.reduceSchedulable);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test contains two jobs with fewer required tasks than there are slots.
* We check that all tasks are assigned, but job 1 gets them first because it
* was submitted earlier.
*/
public void testSmallJobs() throws IOException {
JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
JobInfo info1 = scheduler.infos.get(job1);
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(2, info1.mapSchedulable.getDemand());
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
verifyMetrics();
// Advance time before submitting another job j2, to make j1 run before j2
// deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
JobInfo info2 = scheduler.infos.get(job2);
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(2, info1.mapSchedulable.getDemand());
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
assertEquals(0, info2.mapSchedulable.getRunningTasks());
assertEquals(0, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(2, info2.reduceSchedulable.getDemand());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
verifyMetrics();
// Assign tasks and check that jobs alternate in filling slots
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
assertNull(scheduler.assignTasks(tracker("tt2")));
// Check that the scheduler has started counting the tasks as running
// as soon as it launched them.
assertEquals(2, info1.mapSchedulable.getRunningTasks());
assertEquals(1, info1.reduceSchedulable.getRunningTasks());
assertEquals(2, info1.mapSchedulable.getDemand());
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(1, info2.mapSchedulable.getRunningTasks());
assertEquals(2, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(2, info2.reduceSchedulable.getDemand());
verifyMetrics();
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test is identical to testSmallJobs but sets assignMultiple to
* true so that multiple tasks can be assigned per heartbeat.
*/
public void testSmallJobsWithAssignMultiple() throws IOException {
setUpCluster(1, 2, true);
JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
JobInfo info1 = scheduler.infos.get(job1);
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(2, info1.mapSchedulable.getDemand());
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
verifyMetrics();
// Advance time before submitting another job j2, to make j1 run before j2
// deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
JobInfo info2 = scheduler.infos.get(job2);
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(2, info1.mapSchedulable.getDemand());
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
assertEquals(0, info2.mapSchedulable.getRunningTasks());
assertEquals(0, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(2, info2.reduceSchedulable.getDemand());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
verifyMetrics();
// Assign tasks and check that jobs alternate in filling slots
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
"attempt_test_0001_r_000000_0 on tt1",
"attempt_test_0002_m_000000_0 on tt1",
"attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
"attempt_test_0002_r_000001_0 on tt2");
assertNull(scheduler.assignTasks(tracker("tt2")));
// Check that the scheduler has started counting the tasks as running
// as soon as it launched them.
assertEquals(2, info1.mapSchedulable.getRunningTasks());
assertEquals(1, info1.reduceSchedulable.getRunningTasks());
assertEquals(2, info1.mapSchedulable.getDemand());
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(1, info2.mapSchedulable.getRunningTasks());
assertEquals(2, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(2, info2.reduceSchedulable.getDemand());
verifyMetrics();
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* We submit two jobs such that one has 2x the priority of the other to
* a cluster of 3 nodes, wait for 100 ms, and check that the weights/shares
* the high-priority job gets 4 tasks while the normal-priority job gets 2.
*/
public void testJobsWithPriorities() throws IOException {
setUpCluster(1, 3, false);
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info2 = scheduler.infos.get(job2);
job2.setPriority(JobPriority.HIGH);
scheduler.update();
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(10, info1.mapSchedulable.getDemand());
assertEquals(10, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare(), 0.1);
assertEquals(2.0, info1.reduceSchedulable.getFairShare(), 0.1);
assertEquals(0, info2.mapSchedulable.getRunningTasks());
assertEquals(0, info2.reduceSchedulable.getRunningTasks());
assertEquals(10, info2.mapSchedulable.getDemand());
assertEquals(10, info2.reduceSchedulable.getDemand());
assertEquals(4.0, info2.mapSchedulable.getFairShare(), 0.1);
assertEquals(4.0, info2.reduceSchedulable.getFairShare(), 0.1);
// Advance time
advanceTime(100);
// Assign tasks and check that j2 gets 2x more tasks than j1. In addition,
// whenever the jobs' runningTasks/weight ratios are tied, j1 should get
// the new task first because it started first; thus the tasks of each
// type should be handed out alternately to 1, 2, 2, 1, 2, 2, etc.
System.out.println("HEREEEE");
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
checkAssignment("tt3", "attempt_test_0002_r_000002_0 on tt3");
checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
checkAssignment("tt3", "attempt_test_0002_r_000003_0 on tt3");
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test starts by submitting three large jobs:
* - job1 in the default pool, at time 0
* - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200
* - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 300
*
* We then assign tasks to all slots. The maps should be assigned in the
* order job2, job3, job 3, job1 because jobs 3 and 2 have guaranteed slots
* (1 and 2 respectively). Job2 comes before job3 when they are both at 0
* slots because it has an earlier start time. In a similar manner,
* reduces should be assigned as job2, job3, job2, job1.
*/
public void testLargeJobsWithPools() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give pool A a minimum of 1 map, 2 reduces
out.println("<pool name=\"poolA\">");
out.println("<minMaps>1</minMaps>");
out.println("<minReduces>2</minReduces>");
out.println("</pool>");
// Give pool B a minimum of 2 maps, 1 reduce
out.println("<pool name=\"poolB\">");
out.println("<minMaps>2</minMaps>");
out.println("<minReduces>1</minReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
Pool defaultPool = scheduler.getPoolManager().getPool("default");
Pool poolA = scheduler.getPoolManager().getPool("poolA");
Pool poolB = scheduler.getPoolManager().getPool("poolB");
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(10, info1.mapSchedulable.getDemand());
assertEquals(10, info1.reduceSchedulable.getDemand());
assertEquals(4.0, info1.mapSchedulable.getFairShare());
assertEquals(4.0, info1.reduceSchedulable.getFairShare());
// Advance time 200ms and submit jobs 2 and 3
advanceTime(200);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
advanceTime(100);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
JobInfo info3 = scheduler.infos.get(job3);
// Check that minimum and fair shares have been allocated
assertEquals(0, defaultPool.getMapSchedulable().getMinShare());
assertEquals(0, defaultPool.getReduceSchedulable().getMinShare());
assertEquals(1.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
assertEquals(1, poolA.getMapSchedulable().getMinShare());
assertEquals(2, poolA.getReduceSchedulable().getMinShare());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
assertEquals(2, poolB.getMapSchedulable().getMinShare());
assertEquals(1, poolB.getReduceSchedulable().getMinShare());
assertEquals(2.0, info3.mapSchedulable.getFairShare());
assertEquals(1.0, info3.reduceSchedulable.getFairShare());
// Advance time 100ms
advanceTime(100);
// Assign tasks and check that slots are first given to needy jobs
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test starts by submitting three large jobs:
* - job1 in the default pool, at time 0
* - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200
* - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300
*
* After this, we start assigning tasks. The first two tasks of each type
* should be assigned to job2 and job3 since they are in a pool with an
* allocation guarantee, but the next two slots should be assigned to job 3
* because the pool will no longer be needy.
*/
public void testLargeJobsWithExcessCapacity() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give pool A a minimum of 2 maps, 2 reduces
out.println("<pool name=\"poolA\">");
out.println("<minMaps>2</minMaps>");
out.println("<minReduces>2</minReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
Pool poolA = scheduler.getPoolManager().getPool("poolA");
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(10, info1.mapSchedulable.getDemand());
assertEquals(10, info1.reduceSchedulable.getDemand());
assertEquals(4.0, info1.mapSchedulable.getFairShare());
assertEquals(4.0, info1.reduceSchedulable.getFairShare());
// Advance time 200ms and submit job 2
advanceTime(200);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
// Check that minimum and fair shares have been allocated
assertEquals(2, poolA.getMapSchedulable().getMinShare());
assertEquals(2, poolA.getReduceSchedulable().getMinShare());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(2.0, info1.reduceSchedulable.getFairShare());
assertEquals(2.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
// Advance time 100ms and submit job 3
advanceTime(100);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
JobInfo info3 = scheduler.infos.get(job3);
// Check that minimum and fair shares have been allocated
assertEquals(2, poolA.getMapSchedulable().getMinShare());
assertEquals(2, poolA.getReduceSchedulable().getMinShare());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(2.0, info1.reduceSchedulable.getFairShare());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(1.0, info2.reduceSchedulable.getFairShare());
assertEquals(1.0, info3.mapSchedulable.getFairShare());
assertEquals(1.0, info3.reduceSchedulable.getFairShare());
// Advance time
advanceTime(100);
// Assign tasks and check that slots are first given to needy jobs, but
// that job 1 gets two tasks after due to having a larger share.
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* A copy of testLargeJobsWithExcessCapacity that enables assigning multiple
* tasks per heartbeat. Results should match testLargeJobsWithExcessCapacity.
*/
public void testLargeJobsWithExcessCapacityAndAssignMultiple()
throws Exception {
setUpCluster(1, 2, true);
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give pool A a minimum of 2 maps, 2 reduces
out.println("<pool name=\"poolA\">");
out.println("<minMaps>2</minMaps>");
out.println("<minReduces>2</minReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
Pool poolA = scheduler.getPoolManager().getPool("poolA");
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(10, info1.mapSchedulable.getDemand());
assertEquals(10, info1.reduceSchedulable.getDemand());
assertEquals(4.0, info1.mapSchedulable.getFairShare());
assertEquals(4.0, info1.reduceSchedulable.getFairShare());
// Advance time 200ms and submit job 2
advanceTime(200);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
// Check that minimum and fair shares have been allocated
assertEquals(2, poolA.getMapSchedulable().getMinShare());
assertEquals(2, poolA.getReduceSchedulable().getMinShare());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(2.0, info1.reduceSchedulable.getFairShare());
assertEquals(2.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
// Advance time 100ms and submit job 3
advanceTime(100);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
JobInfo info3 = scheduler.infos.get(job3);
// Check that minimum and fair shares have been allocated
assertEquals(2, poolA.getMapSchedulable().getMinShare());
assertEquals(2, poolA.getReduceSchedulable().getMinShare());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(2.0, info1.reduceSchedulable.getFairShare());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(1.0, info2.reduceSchedulable.getFairShare());
assertEquals(1.0, info3.mapSchedulable.getFairShare());
assertEquals(1.0, info3.reduceSchedulable.getFairShare());
// Advance time
advanceTime(100);
// Assign tasks and check that slots are first given to needy jobs, but
// that job 1 gets two tasks after due to having a larger share.
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
"attempt_test_0002_r_000000_0 on tt1",
"attempt_test_0003_m_000000_0 on tt1",
"attempt_test_0003_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2",
"attempt_test_0001_r_000000_0 on tt2",
"attempt_test_0001_m_000001_0 on tt2",
"attempt_test_0001_r_000001_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test starts by submitting two jobs at time 0:
* - job1 in the default pool
* - job2, with 1 map and 1 reduce, in poolA, which has an alloc of 4
* maps and 4 reduces
*
* When we assign the slots, job2 should only get 1 of each type of task.
*/
public void testSmallJobInLargePool() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give pool A a minimum of 4 maps, 4 reduces
out.println("<pool name=\"poolA\">");
out.println("<minMaps>4</minMaps>");
out.println("<minReduces>4</minReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
// Check scheduler variables
assertEquals(0, info1.mapSchedulable.getRunningTasks());
assertEquals(0, info1.reduceSchedulable.getRunningTasks());
assertEquals(10, info1.mapSchedulable.getDemand());
assertEquals(10, info1.reduceSchedulable.getDemand());
assertEquals(3.0, info1.mapSchedulable.getFairShare());
assertEquals(3.0, info1.reduceSchedulable.getFairShare());
assertEquals(0, info2.mapSchedulable.getRunningTasks());
assertEquals(0, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(1, info2.reduceSchedulable.getDemand());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(1.0, info2.reduceSchedulable.getFairShare());
// Assign tasks and check that slots are first given to needy jobs
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test starts by submitting four jobs in the default pool. However, the
* maxRunningJobs limit for this pool has been set to two. We should see only
* the first two jobs get scheduled, each with half the total slots.
*/
public void testPoolMaxJobs() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"default\">");
out.println("<maxRunningJobs>2</maxRunningJobs>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(10);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info2 = scheduler.infos.get(job2);
advanceTime(10);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info4 = scheduler.infos.get(job4);
// Check scheduler variables
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(2.0, info1.reduceSchedulable.getFairShare());
assertEquals(2.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
assertEquals(0.0, info3.mapSchedulable.getFairShare());
assertEquals(0.0, info3.reduceSchedulable.getFairShare());
assertEquals(0.0, info4.mapSchedulable.getFairShare());
assertEquals(0.0, info4.reduceSchedulable.getFairShare());
// Assign tasks and check that only jobs 1 and 2 get them
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
advanceTime(100);
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test starts by submitting two jobs by user "user1" to the default
* pool, and two jobs by "user2". We set user1's job limit to 1. We should
* see one job from user1 and two from user2.
*/
public void testUserMaxJobs() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<user name=\"user1\">");
out.println("<maxRunningJobs>1</maxRunningJobs>");
out.println("</user>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
job1.getJobConf().set("user.name", "user1");
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(10);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
job2.getJobConf().set("user.name", "user1");
JobInfo info2 = scheduler.infos.get(job2);
advanceTime(10);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
job3.getJobConf().set("user.name", "user2");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
job4.getJobConf().set("user.name", "user2");
JobInfo info4 = scheduler.infos.get(job4);
// Check scheduler variables
assertEquals(1.33, info1.mapSchedulable.getFairShare(), 0.1);
assertEquals(1.33, info1.reduceSchedulable.getFairShare(), 0.1);
assertEquals(0.0, info2.mapSchedulable.getFairShare());
assertEquals(0.0, info2.reduceSchedulable.getFairShare());
assertEquals(1.33, info3.mapSchedulable.getFairShare(), 0.1);
assertEquals(1.33, info3.reduceSchedulable.getFairShare(), 0.1);
assertEquals(1.33, info4.mapSchedulable.getFairShare(), 0.1);
assertEquals(1.33, info4.reduceSchedulable.getFairShare(), 0.1);
// Assign tasks and check that slots are given only to jobs 1, 3 and 4
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
advanceTime(100);
checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test submits jobs in three pools: poolA, which has a weight
* of 2.0; poolB, which has a weight of 0.5; and the default pool, which
* should have a weight of 1.0. It then checks that the map and reduce
* fair shares are given out accordingly. We then submit a second job to
* pool B and check that each gets half of the pool (weight of 0.25).
*/
public void testPoolWeights() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"poolA\">");
out.println("<weight>2.0</weight>");
out.println("</pool>");
out.println("<pool name=\"poolB\">");
out.println("<weight>0.5</weight>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
assertEquals(1.14, info1.mapSchedulable.getFairShare(), 0.01);
assertEquals(1.14, info1.reduceSchedulable.getFairShare(), 0.01);
assertEquals(2.28, info2.mapSchedulable.getFairShare(), 0.01);
assertEquals(2.28, info2.reduceSchedulable.getFairShare(), 0.01);
assertEquals(0.57, info3.mapSchedulable.getFairShare(), 0.01);
assertEquals(0.57, info3.reduceSchedulable.getFairShare(), 0.01);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
JobInfo info4 = scheduler.infos.get(job4);
advanceTime(10);
assertEquals(1.14, info1.mapSchedulable.getFairShare(), 0.01);
assertEquals(1.14, info1.reduceSchedulable.getFairShare(), 0.01);
assertEquals(2.28, info2.mapSchedulable.getFairShare(), 0.01);
assertEquals(2.28, info2.reduceSchedulable.getFairShare(), 0.01);
assertEquals(0.28, info3.mapSchedulable.getFairShare(), 0.01);
assertEquals(0.28, info3.reduceSchedulable.getFairShare(), 0.01);
assertEquals(0.28, info4.mapSchedulable.getFairShare(), 0.01);
assertEquals(0.28, info4.reduceSchedulable.getFairShare(), 0.01);
verifyMetrics();
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test submits jobs in two pools, poolA and poolB. None of the
* jobs in poolA have maps, but this should not affect their reduce
* share.
*/
public void testPoolWeightsWhenNoMaps() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"poolA\">");
out.println("<weight>2.0</weight>");
out.println("</pool>");
out.println("<pool name=\"poolB\">");
out.println("<weight>1.0</weight>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
/*
assertEquals(0, info1.mapWeight, 0.01);
assertEquals(1.0, info1.reduceWeight, 0.01);
assertEquals(0, info2.mapWeight, 0.01);
assertEquals(1.0, info2.reduceWeight, 0.01);
assertEquals(1.0, info3.mapWeight, 0.01);
assertEquals(1.0, info3.reduceWeight, 0.01);
*/
assertEquals(0, info1.mapSchedulable.getFairShare(), 0.01);
assertEquals(1.33, info1.reduceSchedulable.getFairShare(), 0.01);
assertEquals(0, info2.mapSchedulable.getFairShare(), 0.01);
assertEquals(1.33, info2.reduceSchedulable.getFairShare(), 0.01);
assertEquals(4, info3.mapSchedulable.getFairShare(), 0.01);
assertEquals(1.33, info3.reduceSchedulable.getFairShare(), 0.01);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestFairScheduler.java
/**
* This test submits a job that takes all 2 slots in a pool has both a min
* share of 2 slots with minshare timeout of 5s, and then a second job in
* default pool with a fair share timeout of 5s. After 60 seconds, this pool
* will be starved of fair share (2 slots of each type), and we test that it
* does not kill more than 2 tasks of each type.
*/
public void testFairSharePreemptionWithShortTimeout() throws Exception {
// Enable preemption in scheduler
scheduler.preemptionEnabled = true;
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<fairSharePreemptionTimeout>5</fairSharePreemptionTimeout>");
out.println("<pool name=\"pool1\">");
out.println("<minMaps>2</minMaps>");
out.println("<minReduces>2</minReduces>");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
Pool pool1 = scheduler.getPoolManager().getPool("pool1");
Pool defaultPool = scheduler.getPoolManager().getPool("default");
// Submit job 1 and assign all slots to it. Sleep a bit before assigning
// tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool1");
JobInfo info1 = scheduler.infos.get(job1);
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
advanceTime(100);
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
advanceTime(10000);
assertEquals(4, info1.mapSchedulable.getRunningTasks());
assertEquals(4, info1.reduceSchedulable.getRunningTasks());
assertEquals(4.0, info1.mapSchedulable.getFairShare());
assertEquals(4.0, info1.reduceSchedulable.getFairShare());
// Ten seconds later, submit job 2.
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "default");
// Advance time by 6 seconds without update the scheduler.
// This simulates the time gap between update and task preemption.
clock.advance(6000);
assertEquals(4, info1.mapSchedulable.getRunningTasks());
assertEquals(4, info1.reduceSchedulable.getRunningTasks());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(2.0, info1.reduceSchedulable.getFairShare());
assertEquals(0, scheduler.tasksToPreempt(pool1.getMapSchedulable(),
clock.getTime()));
assertEquals(0, scheduler.tasksToPreempt(pool1.getReduceSchedulable(),
clock.getTime()));
assertEquals(2, scheduler.tasksToPreempt(defaultPool.getMapSchedulable(),
clock.getTime()));
assertEquals(2, scheduler.tasksToPreempt(defaultPool.getReduceSchedulable(),
clock.getTime()));
// Test that the tasks actually get preempted and we can assign new ones
scheduler.preemptTasksIfNecessary();
scheduler.update();
assertEquals(2, job1.runningMaps());
assertEquals(2, job1.runningReduces());
checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
assertNull(scheduler.assignTasks(tracker("tt1")));
assertNull(scheduler.assignTasks(tracker("tt2")));
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* This test contains two jobs with fewer required tasks than there are slots.
* We check that all tasks are assigned, but job 1 gets them first because it
* was submitted earlier.
*/
public void testSmallJobs() throws IOException {
JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
JobInfo info1 = scheduler.infos.get(job1);
// Check scheduler variables
assertEquals(0, info1.runningMaps);
assertEquals(0, info1.runningReduces);
assertEquals(2, info1.neededMaps);
assertEquals(1, info1.neededReduces);
assertEquals(0, info1.mapDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(0, info1.reduceDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(2.0, info1.mapFairShare, ALLOW_ERROR);
assertEquals(1.0, info1.reduceFairShare, ALLOW_ERROR);
// Advance time before submitting another job j2, to make j1 run before j2
// deterministically.
advanceTime(100);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
JobInfo info2 = scheduler.infos.get(job2);
// Check scheduler variables; the fair shares should now have been allocated
// equally between j1 and j2, but j1 should have (2 slots)*(100 ms) map
// deficit and (1 slots) * (100 ms) reduce deficit
assertEquals(0, info1.runningMaps);
assertEquals(0, info1.runningReduces);
assertEquals(2, info1.neededMaps);
assertEquals(1, info1.neededReduces);
assertEquals(200, info1.mapDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(100, info1.reduceDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(2.0, info1.mapFairShare, ALLOW_ERROR);
assertEquals(1.0, info1.reduceFairShare, ALLOW_ERROR);
assertEquals(0, info2.runningMaps);
assertEquals(0, info2.runningReduces);
assertEquals(1, info2.neededMaps);
assertEquals(2, info2.neededReduces);
assertEquals(0, info2.mapDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(0, info2.reduceDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(1.0, info2.mapFairShare, ALLOW_ERROR);
assertEquals(2.0, info2.reduceFairShare, ALLOW_ERROR);
// Assign tasks and check that all slots are filled with j1, then j2
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
assertNull(scheduler.assignTasks(tracker("tt2")));
// Check that the scheduler has started counting the tasks as running
// as soon as it launched them.
assertEquals(2, info1.runningMaps);
assertEquals(1, info1.runningReduces);
assertEquals(0, info1.neededMaps);
assertEquals(0, info1.neededReduces);
assertEquals(1, info2.runningMaps);
assertEquals(2, info2.runningReduces);
assertEquals(0, info2.neededMaps);
assertEquals(0, info2.neededReduces);
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* We submit two jobs such that one has 2x the priority of the other, wait
* for 100 ms, and check that the weights/deficits are okay and that the
* tasks all go to the high-priority job.
*/
public void testJobsWithPriorities() throws IOException {
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info2 = scheduler.infos.get(job2);
job2.setPriority(JobPriority.HIGH);
scheduler.update();
// Check scheduler variables
assertEquals(0, info1.runningMaps);
assertEquals(0, info1.runningReduces);
assertEquals(10, info1.neededMaps);
assertEquals(10, info1.neededReduces);
assertEquals(0, info1.mapDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(0, info1.reduceDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(1.33, info1.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info1.reduceFairShare, ALLOW_ERROR);
assertEquals(0, info2.runningMaps);
assertEquals(0, info2.runningReduces);
assertEquals(10, info2.neededMaps);
assertEquals(10, info2.neededReduces);
assertEquals(0, info2.mapDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(0, info2.reduceDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(2.66, info2.mapFairShare, ALLOW_ERROR);
assertEquals(2.66, info2.reduceFairShare, ALLOW_ERROR);
// Advance time and check deficits
advanceTime(100);
assertEquals(133, info1.mapDeficit, 1.0);
assertEquals(133, info1.reduceDeficit, 1.0);
assertEquals(266, info2.mapDeficit, 1.0);
assertEquals(266, info2.reduceDeficit, 1.0);
// Assign tasks and check that all slots are filled with j1, then j2
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* This test starts by submitting two jobs at time 0:
* - job1 in the default pool
* - job2, with 1 map and 1 reduce, in pool_a, which has an alloc of 4
* maps and 4 reduces
*
* When we assign the slots, job2 should only get 1 of each type of task.
*
* The fair share for job 2 should be 2.0 however, because even though it is
* running only one task, it accumulates deficit in case it will have failures
* or need speculative tasks later. (TODO: This may not be a good policy.)
*/
public void testSmallJobInLargePool() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give pool A a minimum of 4 maps, 4 reduces
out.println("<pool name=\"pool_a\">");
out.println("<minMaps>4</minMaps>");
out.println("<minReduces>4</minReduces>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "pool_a");
JobInfo info2 = scheduler.infos.get(job2);
// Check scheduler variables
assertEquals(0, info1.runningMaps);
assertEquals(0, info1.runningReduces);
assertEquals(10, info1.neededMaps);
assertEquals(10, info1.neededReduces);
assertEquals(0, info1.mapDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(0, info1.reduceDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(3.0, info1.mapFairShare, ALLOW_ERROR);
assertEquals(3.0, info1.reduceFairShare, ALLOW_ERROR);
assertEquals(0, info2.runningMaps);
assertEquals(0, info2.runningReduces);
assertEquals(1, info2.neededMaps);
assertEquals(1, info2.neededReduces);
assertEquals(0, info2.mapDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(0, info2.reduceDeficit, ALLOW_DEFICIT_ERROR);
assertEquals(1.0, info2.mapFairShare, ALLOW_ERROR);
assertEquals(1.0, info2.reduceFairShare, ALLOW_ERROR);
// Assign tasks and check that slots are first given to needy jobs
checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* This test starts by submitting four jobs in the default pool. However, the
* maxRunningJobs limit for this pool has been set to two. We should see only
* the first two jobs get scheduled, each with half the total slots.
*/
public void testPoolMaxJobs() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"default\">");
out.println("<maxRunningJobs>2</maxRunningJobs>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(10);
JobInProgress job2 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
JobInfo info2 = scheduler.infos.get(job2);
advanceTime(10);
JobInProgress job3 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
JobInProgress job4 = submitJobNoInitialization(JobStatus.PREP, 10, 10);
JobInfo info4 = scheduler.infos.get(job4);
Thread.sleep(1000L); // Let JobInitializaer to finish the work
// Only two of the jobs should be initialized.
assertTrue(((FakeJobInProgress)job1).isInitialized());
assertTrue(((FakeJobInProgress)job2).isInitialized());
assertFalse(((FakeJobInProgress)job3).isInitialized());
assertFalse(((FakeJobInProgress)job4).isInitialized());
// Check scheduler variables
assertEquals(2.0, info1.mapFairShare, ALLOW_ERROR);
assertEquals(2.0, info1.reduceFairShare, ALLOW_ERROR);
assertEquals(2.0, info2.mapFairShare, ALLOW_ERROR);
assertEquals(2.0, info2.reduceFairShare, ALLOW_ERROR);
assertEquals(0.0, info3.mapFairShare, ALLOW_ERROR);
assertEquals(0.0, info3.reduceFairShare, ALLOW_ERROR);
assertEquals(0.0, info4.mapFairShare, ALLOW_ERROR);
assertEquals(0.0, info4.reduceFairShare, ALLOW_ERROR);
// Assign tasks and check that slots are first to jobs 1 and 2
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
advanceTime(100);
checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* This test configures a pool pool_a, tries the submit a job
* before and after blacklisting of pool_a.
*/
public void testpool_blacklisted() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"default\">");
out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>");
out.println("</pool>");
out.println("<pool name=\"pool_a\">");
out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit a job to the not blacklisted pool_a
JobInProgress job1 =
submitJobNoInitialization(JobStatus.PREP, 10, 10, "pool_a");
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(10);
Thread.sleep(1000L); // Let JobInitializaer to finish the work
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"default\">");
out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>");
out.println("</pool>");
out.println("<pool name=\"pool_a\">");
out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>");
out.println("<blacklisted/>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit a job to the newly blacklisted pool_a
JobInProgress job2 =
submitJobNoInitialization(JobStatus.PREP, 10, 10, "pool_a");
JobInfo info2 = scheduler.infos.get(job2);
advanceTime(10);
Thread.sleep(1000L); // Let JobInitializaer to finish the work
// pool_a is not blacklisted, so goes to pool_a
assertEquals(info1.poolName, "pool_a");
// pool_a is blacklisted, so goes to default
assertEquals(info2.poolName, "default");
}
项目:hadoop-EAR
文件:TestFairScheduler.java
/**
* This test starts by submitting two jobs by user "user1" to the default
* pool, and two jobs by "user2". We set user1's job limit to 1. We should
* see one job from user1 and two from user2.
*/
public void testUserMaxJobs() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<user name=\"user1\">");
out.println("<maxRunningJobs>1</maxRunningJobs>");
out.println("</user>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
job1.getJobConf().set("user.name", "user1");
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(10);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
job2.getJobConf().set("user.name", "user1");
JobInfo info2 = scheduler.infos.get(job2);
advanceTime(10);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
job3.getJobConf().set("user.name", "user2");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
job4.getJobConf().set("user.name", "user2");
JobInfo info4 = scheduler.infos.get(job4);
// Check scheduler variables
assertEquals(1.33, info1.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info1.reduceFairShare, ALLOW_ERROR);
assertEquals(0.0, info2.mapFairShare, ALLOW_ERROR);
assertEquals(0.0, info2.reduceFairShare, ALLOW_ERROR);
assertEquals(1.33, info3.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info3.reduceFairShare, ALLOW_ERROR);
assertEquals(1.33, info4.mapFairShare, ALLOW_ERROR);
assertEquals(1.33, info4.reduceFairShare, ALLOW_ERROR);
// Assign tasks and check that slots are first to jobs 1 and 3
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
advanceTime(100);
checkAssignment("tt2", "attempt_test_0003_m_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0003_r_000000_0 on tt2");
checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
}