Java 类org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress 实例源码
项目:mapreduce-fork
文件:TestTrackerReservation.java
/**
* Test case to check task tracker reservation for a job which
* has a job blacklisted tracker.
* <ol>
* <li>Run a job which fails on one of the tracker.</li>
* <li>Check if the job succeeds and has no reservation.</li>
* </ol>
*
* @throws Exception
*/
public void testTrackerReservationWithJobBlackListedTracker() throws Exception {
FakeJobInProgress job = TestTaskTrackerBlacklisting.runBlackListingJob(
jobTracker, trackers);
assertEquals("Job has no blacklisted trackers", 1, job
.getBlackListedTrackers().size());
assertTrue("Tracker 1 not blacklisted for the job", job
.getBlackListedTrackers().contains(
JobInProgress.convertTrackerNameToHostName(trackers[0])));
assertEquals("Job didnt complete successfully complete", job.getStatus()
.getRunState(), JobStatus.SUCCEEDED);
assertEquals("Reservation for the job not released: Maps",
0, job.getNumReservedTaskTrackersForMaps());
assertEquals("Reservation for the job not released : Reduces",
0, job.getNumReservedTaskTrackersForReduces());
ClusterMetrics metrics = jobTracker.getClusterMetrics();
assertEquals("reserved map slots do not match",
0, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
0, metrics.getReservedReduceSlots());
}
项目:mapreduce-fork
文件:TestSetupTaskScheduling.java
void addNewTaskStatus(FakeJobInProgress job, TaskType taskType,
boolean useMapSlot, String tracker, List<TaskStatus> reports)
throws IOException {
TaskAttemptID task = null;
TaskStatus status = null;
if (taskType == TaskType.MAP) {
task = job.findMapTask(tracker);
status = new MapTaskStatus(task, 0.01f, 2,
TaskStatus.State.RUNNING, "", "", tracker,
TaskStatus.Phase.MAP, new Counters());
} else if (taskType == TaskType.TASK_CLEANUP) {
if (useMapSlot) {
status = job.maps[0].taskStatuses.get(
new TaskAttemptID(job.maps[0].getTIPId(), 0));
} else {
status = job.reduces[0].taskStatuses.get(
new TaskAttemptID(job.reduces[0].getTIPId(), 0));
}
} else {
task = job.findReduceTask(tracker);
status = new ReduceTaskStatus(task, 0.01f, 2,
TaskStatus.State.RUNNING, "", "", tracker,
TaskStatus.Phase.REDUCE, new Counters());
}
reports.add(status);
}
项目:mapreduce-fork
文件:TestSetupTaskScheduling.java
/**
* Test that a setup task can be run against a map slot
* if it is free.
* @throws IOException
*/
public void testSetupTaskReturnedForFreeMapSlots() throws IOException {
// create a job with a setup task.
FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
jobTracker.jobs.put(job.getJobID(), job);
// create a status simulating a free tasktracker
List<TaskStatus> reports = new ArrayList<TaskStatus>();
TaskTrackerStatus ttStatus
= createTaskTrackerStatus(trackers[2], reports);
// verify that a setup task can be assigned to a map slot.
List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
assertEquals(1, tasks.size());
assertTrue(tasks.get(0).isJobSetupTask());
assertTrue(tasks.get(0).isMapTask());
jobTracker.jobs.clear();
}
项目:mapreduce-fork
文件:TestSetupTaskScheduling.java
/**
* Test to check that map slots are counted when returning
* a setup task.
* @throws IOException
*/
public void testMapSlotsCountedForSetup() throws IOException {
// create a job with a setup task.
FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
jobTracker.jobs.put(job.getJobID(), job);
// create another job for reservation
FakeJobInProgress job1 = createJob(null);
jobTracker.jobs.put(job1.getJobID(), job1);
// create TT status for testing getSetupAndCleanupTasks
List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
addNewTaskStatus(job, TaskType.MAP, true, trackers[0], taskStatuses);
TaskTrackerStatus ttStatus
= createTaskTrackerStatus(trackers[0], taskStatuses);
// test that there should be no map setup task returned.
List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
assertEquals(1, tasks.size());
assertTrue(tasks.get(0).isJobSetupTask());
assertFalse(tasks.get(0).isMapTask());
jobTracker.jobs.clear();
}
项目:mapreduce-fork
文件:TestSetupTaskScheduling.java
/**
* Test to check that reduce slots are also counted when returning
* a setup task.
* @throws IOException
*/
public void testReduceSlotsCountedForSetup() throws IOException {
// create a job with a setup task.
FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
jobTracker.jobs.put(job.getJobID(), job);
// create another job for reservation
FakeJobInProgress job1 = createJob(null);
jobTracker.jobs.put(job1.getJobID(), job1);
// create TT status for testing getSetupAndCleanupTasks
List<TaskStatus> reports = new ArrayList<TaskStatus>();
// because free map slots are checked first in code,
// we fill up map slots also.
addNewTaskStatus(job1, TaskType.MAP, true, trackers[1], reports);
addNewTaskStatus(job1, TaskType.REDUCE, false,trackers[1], reports);
TaskTrackerStatus ttStatus
= createTaskTrackerStatus(trackers[1], reports);
// test that there should be no setup task returned,
// as both map and reduce slots are occupied.
List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
assertNull(tasks);
jobTracker.jobs.clear();
}
项目:mapreduce-fork
文件:TestSetupTaskScheduling.java
/**
* Test to check that map slots are counted when returning
* a taskCleanup task.
* @throws IOException
*/
public void testNumSlotsUsedForTaskCleanup() throws IOException {
// Create a high RAM job with a map task's cleanup task and a reduce task's
// cleanup task. Make this Fake job a high RAM job by setting the slots
// required for map/reduce task to 2.
FakeJobInProgress job = createJob(TaskType.TASK_CLEANUP);
jobTracker.jobs.put(job.getJobID(), job);
// create TT status for testing getSetupAndCleanupTasks
List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
TaskTrackerStatus ttStatus =
createTaskTrackerStatus(trackers[0], taskStatuses);//create dummy status
// validate mapTaskCleanup task
validateNumSlotsUsedForTaskCleanup(ttStatus);
// validate reduceTaskCleanup task
validateNumSlotsUsedForTaskCleanup(ttStatus);
jobTracker.jobs.clear();
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
public void testTaskToSpeculate() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(5);
conf.setNumReduceTasks(5);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWNODE_THRESHOLD, 100f);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule maps
taskAttemptID[0] = job.findReduceTask(trackers[0]);
taskAttemptID[1] = job.findReduceTask(trackers[1]);
taskAttemptID[2] = job.findReduceTask(trackers[2]);
taskAttemptID[3] = job.findReduceTask(trackers[3]);
taskAttemptID[4] = job.findReduceTask(trackers[3]);
clock.advance(5000);
job.finishTask(taskAttemptID[0]);
clock.advance(1000);
job.finishTask(taskAttemptID[1]);
clock.advance(20000);
clock.advanceBySpeculativeLag();
job.refresh(clock.getTime());
//we should get a speculative task now
taskAttemptID[5] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),2);
clock.advance(5000);
job.finishTask(taskAttemptID[5]);
job.refresh(clock.getTime());
taskAttemptID[5] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),3);
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
public void testTaskLATEScheduling() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(5);
conf.setNumReduceTasks(0);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
taskAttemptID[3] = job.findMapTask(trackers[3]);
clock.advance(2000);
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
clock.advance(250000);
taskAttemptID[4] = job.findMapTask(trackers[3]);
clock.advanceBySpeculativeLag();
//by doing the above clock adjustments, we bring the progress rate of
//taskID 3 lower than 4. For taskID 3, the rate is 85/317000
//and for taskID 4, the rate is 20/65000. But when we ask for a spec task
//now, we should get back taskID 4 (since that is expected to complete
//later than taskID 3).
job.refresh(clock.getTime());
job.progressMade(taskAttemptID[3], 0.85f);
job.progressMade(taskAttemptID[4], 0.20f);
taskAttemptID[5] = job.findMapTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),4);
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
public void testFastTaskScheduling() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[2];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(2);
conf.setNumReduceTasks(0);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
conf.setMapSpeculativeDuration(300L);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
// a really fast task #1
taskAttemptID[0] = job.findMapTask(trackers[0]);
clock.advance(2000);
job.finishTask(taskAttemptID[0]);
// task #2 is slow
taskAttemptID[1] = job.findMapTask(trackers[1]);
clock.advanceBySpeculativeLag();
clock.advance(5000);
// 65 secs have elapsed since task scheduling
// set progress so that it will complete within
// 300 seconds
job.progressMade(taskAttemptID[1], 0.7f);
// no new map task should be found
job.refresh(clock.getTime());
assertEquals(job.findMapTask(trackers[2]), null);
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(totalTasks);
conf.setNumReduceTasks(0);
jobTracker.setNumSlots(slots);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
int i;
for (i = 0; i < totalTasks; i++) {
taskAttemptID[i] = job.findMapTask(trackers[0]);
}
clock.advance(5000);
for (i = 0; i < numEarlyComplete; i++) {
job.finishTask(taskAttemptID[i]);
}
clock.advanceBySpeculativeLag();
for (i = numEarlyComplete; i < totalTasks; i++) {
job.progressMade(taskAttemptID[i], 0.85f);
}
clock.advance(50000);
for (i = 0; i < (totalTasks - numEarlyComplete); i++) {
job.refresh(clock.getTime());
taskAttemptID[i] = job.findMapTask(trackers[1]);
clock.advance(2000);
if (taskAttemptID[i] != null) {
//add some good progress constantly for the different
//task-attempts so that
//the tasktracker doesn't get into the slow trackers category
job.progressMade(taskAttemptID[i], 0.99f);
} else {
break;
}
}
return i;
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
public void testSlowMapProgressingRate() throws IOException {
clock.advance(1000);
TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(0);
//use processing rate for speculation
conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule maps
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
clock.advance(1000);
job.finishTask(taskAttemptID[0]);
//if consider the progress rate, we should speculate task 1
//but if consider the processing rate, which is map_input_bytes/time
//then we should speculate task 2
job.processingRate(taskAttemptID[1], Task.Counter.MAP_INPUT_BYTES,
100000000, 0.1f, TaskStatus.Phase.MAP);
job.processingRate(taskAttemptID[2], Task.Counter.MAP_INPUT_BYTES,
1000, 0.5f, TaskStatus.Phase.MAP);
clock.advanceBySpeculativeLag();
//we should get a speculative task now
job.refresh(clock.getTime());
taskAttemptID[3] = job.findMapTask(trackers[0]);
assertEquals(taskAttemptID[3].getTaskID().getId(),2);
}
项目:RDFS
文件:TestSpeculativeExecution.java
public void testTaskToSpeculate() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(5);
conf.setNumReduceTasks(5);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWNODE_THRESHOLD, 100f);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule maps
taskAttemptID[0] = job.findReduceTask(trackers[0]);
taskAttemptID[1] = job.findReduceTask(trackers[1]);
taskAttemptID[2] = job.findReduceTask(trackers[2]);
taskAttemptID[3] = job.findReduceTask(trackers[3]);
taskAttemptID[4] = job.findReduceTask(trackers[3]);
clock.advance(5000);
job.finishTask(taskAttemptID[0]);
clock.advance(1000);
job.finishTask(taskAttemptID[1]);
clock.advance(20000);
clock.advanceBySpeculativeLag();
job.refresh(clock.getTime());
//we should get a speculative task now
taskAttemptID[5] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),2);
clock.advance(5000);
job.finishTask(taskAttemptID[5]);
job.refresh(clock.getTime());
taskAttemptID[5] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),3);
}
项目:RDFS
文件:TestSpeculativeExecution.java
public void testTaskLATEScheduling() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(5);
conf.setNumReduceTasks(0);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
taskAttemptID[3] = job.findMapTask(trackers[3]);
clock.advance(2000);
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
clock.advance(250000);
taskAttemptID[4] = job.findMapTask(trackers[3]);
clock.advanceBySpeculativeLag();
//by doing the above clock adjustments, we bring the progress rate of
//taskID 3 lower than 4. For taskID 3, the rate is 85/317000
//and for taskID 4, the rate is 20/65000. But when we ask for a spec task
//now, we should get back taskID 4 (since that is expected to complete
//later than taskID 3).
job.refresh(clock.getTime());
job.progressMade(taskAttemptID[3], 0.85f);
job.progressMade(taskAttemptID[4], 0.20f);
taskAttemptID[5] = job.findMapTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),4);
}
项目:RDFS
文件:TestSpeculativeExecution.java
public void testFastTaskScheduling() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[2];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(2);
conf.setNumReduceTasks(0);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
conf.setMapSpeculativeDuration(300L);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
// a really fast task #1
taskAttemptID[0] = job.findMapTask(trackers[0]);
clock.advance(2000);
job.finishTask(taskAttemptID[0]);
// task #2 is slow
taskAttemptID[1] = job.findMapTask(trackers[1]);
clock.advanceBySpeculativeLag();
clock.advance(5000);
// 65 secs have elapsed since task scheduling
// set progress so that it will complete within
// 300 seconds
job.progressMade(taskAttemptID[1], 0.7f);
// no new map task should be found
job.refresh(clock.getTime());
assertEquals(job.findMapTask(trackers[2]), null);
}
项目:RDFS
文件:TestSpeculativeExecution.java
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(totalTasks);
conf.setNumReduceTasks(0);
jobTracker.setNumSlots(slots);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
int i;
for (i = 0; i < totalTasks; i++) {
taskAttemptID[i] = job.findMapTask(trackers[0]);
}
clock.advance(5000);
for (i = 0; i < numEarlyComplete; i++) {
job.finishTask(taskAttemptID[i]);
}
clock.advanceBySpeculativeLag();
for (i = numEarlyComplete; i < totalTasks; i++) {
job.progressMade(taskAttemptID[i], 0.85f);
}
clock.advance(50000);
for (i = 0; i < (totalTasks - numEarlyComplete); i++) {
job.refresh(clock.getTime());
taskAttemptID[i] = job.findMapTask(trackers[1]);
clock.advance(2000);
if (taskAttemptID[i] != null) {
//add some good progress constantly for the different
//task-attempts so that
//the tasktracker doesn't get into the slow trackers category
job.progressMade(taskAttemptID[i], 0.99f);
} else {
break;
}
}
return i;
}
项目:RDFS
文件:TestSpeculativeExecution.java
public void testSlowMapProgressingRate() throws IOException {
clock.advance(1000);
TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(0);
//use processing rate for speculation
conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule maps
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
clock.advance(1000);
job.finishTask(taskAttemptID[0]);
//if consider the progress rate, we should speculate task 1
//but if consider the processing rate, which is map_input_bytes/time
//then we should speculate task 2
job.processingRate(taskAttemptID[1], Task.Counter.MAP_INPUT_BYTES,
100000000, 0.1f, TaskStatus.Phase.MAP);
job.processingRate(taskAttemptID[2], Task.Counter.MAP_INPUT_BYTES,
1000, 0.5f, TaskStatus.Phase.MAP);
clock.advanceBySpeculativeLag();
//we should get a speculative task now
job.refresh(clock.getTime());
taskAttemptID[3] = job.findMapTask(trackers[0]);
assertEquals(taskAttemptID[3].getTaskID().getId(),2);
}
项目:mapreduce-fork
文件:TestLostTracker.java
public void testLostTracker() throws IOException {
// Tracker 0 contacts JT
FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
TaskAttemptID[] tid = new TaskAttemptID[2];
JobConf conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
// Tracker 0 gets the map task
tid[0] = job.findMapTask(trackers[0]);
job.finishTask(tid[0]);
// Advance clock. Tracker 0 would have got lost
clock.advance(8 * 1000);
jobTracker.checkExpiredTrackers();
// Tracker 1 establishes contact with JT
FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
// Tracker1 should get assigned the lost map task
tid[1] = job.findMapTask(trackers[1]);
assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]);
assertEquals("Task ID of reassigned map task does not match",
tid[0].getTaskID().toString(), tid[1].getTaskID().toString());
job.finishTask(tid[1]);
}
项目:mapreduce-fork
文件:TestSpeculativeExecution.java
public void testTaskToSpeculate() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(5);
conf.setNumReduceTasks(5);
conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule maps
taskAttemptID[0] = job.findReduceTask(trackers[0]);
taskAttemptID[1] = job.findReduceTask(trackers[1]);
taskAttemptID[2] = job.findReduceTask(trackers[2]);
taskAttemptID[3] = job.findReduceTask(trackers[3]);
taskAttemptID[4] = job.findReduceTask(trackers[3]);
clock.advance(5000);
job.finishTask(taskAttemptID[0]);
clock.advance(1000);
job.finishTask(taskAttemptID[1]);
clock.advance(20000);
clock.advanceBySpeculativeLag();
//we should get a speculative task now
taskAttemptID[5] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),2);
clock.advance(5000);
job.finishTask(taskAttemptID[5]);
taskAttemptID[5] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),3);
// Verify total speculative tasks by jobtracker instrumentation
assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
assertEquals("Total speculative reduces", 3,
fakeInst.numSpeculativeReduces);
LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
项目:mapreduce-fork
文件:TestSpeculativeExecution.java
public void testTaskLATEScheduling() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(5);
conf.setNumReduceTasks(0);
conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
taskAttemptID[3] = job.findMapTask(trackers[3]);
clock.advance(2000);
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
clock.advance(250000);
taskAttemptID[4] = job.findMapTask(trackers[3]);
clock.advanceBySpeculativeLag();
//by doing the above clock adjustments, we bring the progress rate of
//taskID 3 lower than 4. For taskID 3, the rate is 85/317000
//and for taskID 4, the rate is 20/65000. But when we ask for a spec task
//now, we should get back taskID 4 (since that is expected to complete
//later than taskID 3).
job.progressMade(taskAttemptID[3], 0.85f);
job.progressMade(taskAttemptID[4], 0.20f);
taskAttemptID[5] = job.findMapTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),4);
// Verify total speculative tasks by jobtracker instrumentation
assertEquals("Total speculative maps", 2, fakeInst.numSpeculativeMaps);
assertEquals("Total speculative reduces", 3,
fakeInst.numSpeculativeReduces);
LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
项目:mapreduce-fork
文件:TestSpeculativeExecution.java
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(totalTasks);
conf.setNumReduceTasks(0);
jobTracker.setNumSlots(slots);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
int i;
for (i = 0; i < totalTasks; i++) {
taskAttemptID[i] = job.findMapTask(trackers[0]);
}
clock.advance(5000);
for (i = 0; i < numEarlyComplete; i++) {
job.finishTask(taskAttemptID[i]);
}
clock.advanceBySpeculativeLag();
for (i = numEarlyComplete; i < totalTasks; i++) {
job.progressMade(taskAttemptID[i], 0.85f);
}
clock.advance(50000);
for (i = 0; i < (totalTasks - numEarlyComplete); i++) {
taskAttemptID[i] = job.findMapTask(trackers[1]);
clock.advance(2000);
if (taskAttemptID[i] != null) {
//add some good progress constantly for the different
//task-attempts so that
//the tasktracker doesn't get into the slow trackers category
job.progressMade(taskAttemptID[i], 0.99f);
} else {
break;
}
}
return i;
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
public void testRunningTaskCountWithSpeculation() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(3);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//Check for runningMap counts first
//schedule maps
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
clock.advance(5000);
job.finishTask(taskAttemptID[0]);
clock.advance(1000);
job.finishTask(taskAttemptID[1]);
clock.advanceBySpeculativeLag();
//we should get a speculative task now
job.refresh(clock.getTime());
taskAttemptID[3] = job.findMapTask(trackers[3]);
job.refresh(clock.getTime());
int oldRunningMap = job.runningMaps();
LOG.info("No of running maps before fail was " + oldRunningMap);
job.failTask(taskAttemptID[2]);
job.refresh(clock.getTime());
assertEquals(
"Running maps count should be updated from " + oldRunningMap + " to " +
(oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
LOG.info(" Job running maps after fail " + job.runningMaps());
clock.advance(5000);
job.finishTask(taskAttemptID[3]);
//check for runningReduce count.
taskAttemptID[4] = job.findReduceTask(trackers[0]);
taskAttemptID[5] = job.findReduceTask(trackers[1]);
taskAttemptID[6] = job.findReduceTask(trackers[2]);
clock.advance(5000);
job.finishTask(taskAttemptID[4]);
clock.advance(1000);
job.finishTask(taskAttemptID[5]);
job.refresh(clock.getTime());
clock.advanceBySpeculativeLag();
taskAttemptID[7] = job.findReduceTask(trackers[4]);
job.refresh(clock.getTime());
int oldRunningReduces = job.runningReduces();
job.failTask(taskAttemptID[6]);
job.refresh(clock.getTime());
LOG.info(
" No of running Reduces before fail " + oldRunningReduces);
LOG.info(
" No of runing reduces after fail " + job.runningReduces());
assertEquals(
"Running reduces count should be updated from " + oldRunningReduces +
" to " + (oldRunningReduces - 1), job.runningReduces(),
oldRunningReduces - 1);
job.finishTask(taskAttemptID[7]);
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
public void testIsSlowTracker() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(10);
conf.setNumReduceTasks(0);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule some tasks
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[0]);
taskAttemptID[2] = job.findMapTask(trackers[0]);
taskAttemptID[3] = job.findMapTask(trackers[1]);
taskAttemptID[4] = job.findMapTask(trackers[1]);
taskAttemptID[5] = job.findMapTask(trackers[1]);
taskAttemptID[6] = job.findMapTask(trackers[2]);
taskAttemptID[7] = job.findMapTask(trackers[2]);
taskAttemptID[8] = job.findMapTask(trackers[2]);
clock.advance(1000);
//Some tasks finish in 1 second (on trackers[0])
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
clock.advance(1000);
//Some tasks finish in 2 second (on trackers[1])
job.finishTask(taskAttemptID[3]);
job.finishTask(taskAttemptID[4]);
job.finishTask(taskAttemptID[5]);
assertEquals("Tracker "+ trackers[0] + " expected to be not slow ",
job.isSlowTracker(trackers[0]), false);
clock.advance(100000);
//After a long time, some tasks finished on trackers[2]
job.finishTask(taskAttemptID[6]);
job.finishTask(taskAttemptID[7]);
job.finishTask(taskAttemptID[8]);
job.refresh(clock.getTime());
assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
job.isSlowTracker(trackers[2]), true);
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
/**
* tests that a task that has a remaining time less than duration
* time
*/
public void testTaskSpeculationStddevCap() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setFloat(JobInProgress.SPECULATIVE_STDDEVMEANRATIO_MAX, 0.33f);
conf.setNumMapTasks(7);
conf.setNumReduceTasks(0);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
// all but one tasks start off
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
taskAttemptID[3] = job.findMapTask(trackers[0]);
taskAttemptID[4] = job.findMapTask(trackers[1]);
taskAttemptID[5] = job.findMapTask(trackers[2]);
// 3 tasks finish really fast in 15s
clock.advance (15 * 1000);
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
// advance to 600s and schedule last mapper
clock.advance (585 * 1000);
taskAttemptID[6] = job.findMapTask(trackers[0]);
// advance to 700s and report progress
clock.advance (10 * 60 * 1000);
// set progress rates
job.progressMade(taskAttemptID[3], 0.2f);
job.progressMade(taskAttemptID[4], 0.5f);
job.progressMade(taskAttemptID[5], 0.6f);
job.progressMade(taskAttemptID[6], 0.02f);
// the progress has been set in such a way that
// stddev > mean. now we depend on stddev capping
// for speculation.
job.refresh(clock.getTime());
taskAttemptID[7] = job.findMapTask(trackers[1]);
// no new map task should be found
if(taskAttemptID[7] == null)
Assert.fail();
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
public void testSpeculateLastTask() throws Exception {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(3);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
clock.advanceBySpeculativeLag();
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
// Speculative last unfinised task
job.refresh(clock.getTime());
taskAttemptID[3] = job.findMapTask(trackers[3]);
Assert.assertNotNull(taskAttemptID[3]);
job.finishTask(taskAttemptID[2]);
job.finishTask(taskAttemptID[3]);
taskAttemptID[4] = job.findReduceTask(trackers[0]);
taskAttemptID[5] = job.findReduceTask(trackers[1]);
taskAttemptID[6] = job.findReduceTask(trackers[2]);
clock.advanceBySpeculativeLag();
job.finishTask(taskAttemptID[4]);
job.finishTask(taskAttemptID[5]);
// Speculative last unfinised task
job.refresh(clock.getTime());
taskAttemptID[7] = job.findReduceTask(trackers[3]);
Assert.assertNotNull(taskAttemptID[7]);
job.finishTask(taskAttemptID[6]);
job.finishTask(taskAttemptID[7]);
}
项目:hadoop-EAR
文件:TestSpeculativeExecution.java
public void testSlowReduceProgressingRate() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(4);
conf.setNumReduceTasks(4);
//use processing rate for speculation
conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule reduces
taskAttemptID[0] = job.findReduceTask(trackers[0]);
taskAttemptID[1] = job.findReduceTask(trackers[1]);
taskAttemptID[2] = job.findReduceTask(trackers[2]);
taskAttemptID[3] = job.findReduceTask(trackers[3]);
clock.advance(1000);
//task 0 just starts copying, while task 1, 2, 3 are already in the reducing
//phase. If we compared the progress rate, then we should speculate 0.
//However, by comparing the processing rate in the copy phase, among all 4
//tasks, task 0 is fast, and we should not speculate it.
//for task 1, 2, 3, they are all in the reducing phase, with same progress,
//however, task 1 has smaller processing rate(the statistics of the reduce
//phase for all the tasks will also include statistics for task 0, whose
//processing rate is 0)
job.finishCopy(taskAttemptID[1], clock.getTime(), 10000);
job.finishCopy(taskAttemptID[2], clock.getTime(), 10000);
job.finishCopy(taskAttemptID[3], clock.getTime(), 10000);
clock.advance(1000);
job.finishSort(taskAttemptID[1], clock.getTime());
job.finishSort(taskAttemptID[2], clock.getTime());
job.finishSort(taskAttemptID[3], clock.getTime());
job.processingRate(taskAttemptID[0], Task.Counter.REDUCE_SHUFFLE_BYTES,
100000000, 0.1f, TaskStatus.Phase.SHUFFLE);
job.processingRate(taskAttemptID[1], Task.Counter.REDUCE_INPUT_BYTES,
1000, 0.8f, TaskStatus.Phase.REDUCE);
job.processingRate(taskAttemptID[2], Task.Counter.REDUCE_INPUT_BYTES,
100000000, 0.8f, TaskStatus.Phase.REDUCE);
job.processingRate(taskAttemptID[3], Task.Counter.REDUCE_INPUT_BYTES,
100000000, 0.8f, TaskStatus.Phase.REDUCE);
clock.advanceBySpeculativeLag();
//we should get a speculative task now
job.refresh(clock.getTime());
taskAttemptID[4] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[4].getTaskID().getId(),1);
}
项目:RDFS
文件:TestSpeculativeExecution.java
public void testRunningTaskCountWithSpeculation() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(3);
conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//Check for runningMap counts first
//schedule maps
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
clock.advance(5000);
job.finishTask(taskAttemptID[0]);
clock.advance(1000);
job.finishTask(taskAttemptID[1]);
clock.advanceBySpeculativeLag();
//we should get a speculative task now
job.refresh(clock.getTime());
taskAttemptID[3] = job.findMapTask(trackers[3]);
job.refresh(clock.getTime());
int oldRunningMap = job.runningMaps();
LOG.info("No of running maps before fail was " + oldRunningMap);
job.failTask(taskAttemptID[2]);
job.refresh(clock.getTime());
assertEquals(
"Running maps count should be updated from " + oldRunningMap + " to " +
(oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
LOG.info(" Job running maps after fail " + job.runningMaps());
clock.advance(5000);
job.finishTask(taskAttemptID[3]);
//check for runningReduce count.
taskAttemptID[4] = job.findReduceTask(trackers[0]);
taskAttemptID[5] = job.findReduceTask(trackers[1]);
taskAttemptID[6] = job.findReduceTask(trackers[2]);
clock.advance(5000);
job.finishTask(taskAttemptID[4]);
clock.advance(1000);
job.finishTask(taskAttemptID[5]);
job.refresh(clock.getTime());
clock.advanceBySpeculativeLag();
taskAttemptID[7] = job.findReduceTask(trackers[4]);
job.refresh(clock.getTime());
int oldRunningReduces = job.runningReduces();
job.failTask(taskAttemptID[6]);
job.refresh(clock.getTime());
LOG.info(
" No of running Reduces before fail " + oldRunningReduces);
LOG.info(
" No of runing reduces after fail " + job.runningReduces());
assertEquals(
"Running reduces count should be updated from " + oldRunningReduces +
" to " + (oldRunningReduces - 1), job.runningReduces(),
oldRunningReduces - 1);
job.finishTask(taskAttemptID[7]);
}
项目:RDFS
文件:TestSpeculativeExecution.java
public void testIsSlowTracker() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(10);
conf.setNumReduceTasks(0);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule some tasks
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[0]);
taskAttemptID[2] = job.findMapTask(trackers[0]);
taskAttemptID[3] = job.findMapTask(trackers[1]);
taskAttemptID[4] = job.findMapTask(trackers[1]);
taskAttemptID[5] = job.findMapTask(trackers[1]);
taskAttemptID[6] = job.findMapTask(trackers[2]);
taskAttemptID[7] = job.findMapTask(trackers[2]);
taskAttemptID[8] = job.findMapTask(trackers[2]);
clock.advance(1000);
//Some tasks finish in 1 second (on trackers[0])
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
clock.advance(1000);
//Some tasks finish in 2 second (on trackers[1])
job.finishTask(taskAttemptID[3]);
job.finishTask(taskAttemptID[4]);
job.finishTask(taskAttemptID[5]);
assertEquals("Tracker "+ trackers[0] + " expected to be not slow ",
job.isSlowTracker(trackers[0]), false);
clock.advance(100000);
//After a long time, some tasks finished on trackers[2]
job.finishTask(taskAttemptID[6]);
job.finishTask(taskAttemptID[7]);
job.finishTask(taskAttemptID[8]);
job.refresh(clock.getTime());
assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
job.isSlowTracker(trackers[2]), true);
}
项目:RDFS
文件:TestSpeculativeExecution.java
/**
* tests that a task that has a remaining time less than duration
* time
*/
public void testTaskSpeculationStddevCap() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setFloat(JobInProgress.SPECULATIVE_STDDEVMEANRATIO_MAX, 0.33f);
conf.setNumMapTasks(7);
conf.setNumReduceTasks(0);
conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0);
conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
// all but one tasks start off
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
taskAttemptID[3] = job.findMapTask(trackers[0]);
taskAttemptID[4] = job.findMapTask(trackers[1]);
taskAttemptID[5] = job.findMapTask(trackers[2]);
// 3 tasks finish really fast in 15s
clock.advance (15 * 1000);
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
// advance to 600s and schedule last mapper
clock.advance (585 * 1000);
taskAttemptID[6] = job.findMapTask(trackers[0]);
// advance to 700s and report progress
clock.advance (10 * 60 * 1000);
// set progress rates
job.progressMade(taskAttemptID[3], 0.2f);
job.progressMade(taskAttemptID[4], 0.5f);
job.progressMade(taskAttemptID[5], 0.6f);
job.progressMade(taskAttemptID[6], 0.02f);
// the progress has been set in such a way that
// stddev > mean. now we depend on stddev capping
// for speculation.
job.refresh(clock.getTime());
taskAttemptID[7] = job.findMapTask(trackers[1]);
// no new map task should be found
if(taskAttemptID[7] == null)
Assert.fail();
}
项目:RDFS
文件:TestSpeculativeExecution.java
public void testSpeculateLastTask() throws Exception {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(3);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
clock.advanceBySpeculativeLag();
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
// Speculative last unfinised task
job.refresh(clock.getTime());
taskAttemptID[3] = job.findMapTask(trackers[3]);
Assert.assertNotNull(taskAttemptID[3]);
job.finishTask(taskAttemptID[2]);
job.finishTask(taskAttemptID[3]);
taskAttemptID[4] = job.findReduceTask(trackers[0]);
taskAttemptID[5] = job.findReduceTask(trackers[1]);
taskAttemptID[6] = job.findReduceTask(trackers[2]);
clock.advanceBySpeculativeLag();
job.finishTask(taskAttemptID[4]);
job.finishTask(taskAttemptID[5]);
// Speculative last unfinised task
job.refresh(clock.getTime());
taskAttemptID[7] = job.findReduceTask(trackers[3]);
Assert.assertNotNull(taskAttemptID[7]);
job.finishTask(taskAttemptID[6]);
job.finishTask(taskAttemptID[7]);
}
项目:RDFS
文件:TestSpeculativeExecution.java
public void testSlowReduceProgressingRate() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(4);
conf.setNumReduceTasks(4);
//use processing rate for speculation
conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule reduces
taskAttemptID[0] = job.findReduceTask(trackers[0]);
taskAttemptID[1] = job.findReduceTask(trackers[1]);
taskAttemptID[2] = job.findReduceTask(trackers[2]);
taskAttemptID[3] = job.findReduceTask(trackers[3]);
clock.advance(1000);
//task 0 just starts copying, while task 1, 2, 3 are already in the reducing
//phase. If we compared the progress rate, then we should speculate 0.
//However, by comparing the processing rate in the copy phase, among all 4
//tasks, task 0 is fast, and we should not speculate it.
//for task 1, 2, 3, they are all in the reducing phase, with same progress,
//however, task 1 has smaller processing rate(the statistics of the reduce
//phase for all the tasks will also include statistics for task 0, whose
//processing rate is 0)
job.finishCopy(taskAttemptID[1], clock.getTime(), 10000);
job.finishCopy(taskAttemptID[2], clock.getTime(), 10000);
job.finishCopy(taskAttemptID[3], clock.getTime(), 10000);
clock.advance(1000);
job.finishSort(taskAttemptID[1], clock.getTime());
job.finishSort(taskAttemptID[2], clock.getTime());
job.finishSort(taskAttemptID[3], clock.getTime());
job.processingRate(taskAttemptID[0], Task.Counter.REDUCE_SHUFFLE_BYTES,
100000000, 0.1f, TaskStatus.Phase.SHUFFLE);
job.processingRate(taskAttemptID[1], Task.Counter.REDUCE_INPUT_BYTES,
1000, 0.8f, TaskStatus.Phase.REDUCE);
job.processingRate(taskAttemptID[2], Task.Counter.REDUCE_INPUT_BYTES,
100000000, 0.8f, TaskStatus.Phase.REDUCE);
job.processingRate(taskAttemptID[3], Task.Counter.REDUCE_INPUT_BYTES,
100000000, 0.8f, TaskStatus.Phase.REDUCE);
clock.advanceBySpeculativeLag();
//we should get a speculative task now
job.refresh(clock.getTime());
taskAttemptID[4] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[4].getTaskID().getId(),1);
}
项目:mapreduce-fork
文件:TestTrackerReservation.java
/**
* Test case to test if task tracker reservation.
* <ol>
* <li>Run a cluster with 3 trackers.</li>
* <li>Submit a job which reserves all the slots in two
* trackers.</li>
* <li>Run the job on another tracker which has
* no reservations</li>
* <li>Finish the job and observe the reservations are
* successfully canceled</li>
* </ol>
*
* @throws Exception
*/
public void testTaskTrackerReservation() throws Exception {
JobConf conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setSpeculativeExecution(false);
conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
//Set task tracker objects for reservation.
TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
TaskTracker tt3 = jobTracker.getTaskTracker(trackers[2]);
TaskTrackerStatus status1 = new TaskTrackerStatus(
trackers[0],JobInProgress.convertTrackerNameToHostName(
trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
TaskTrackerStatus status2 = new TaskTrackerStatus(
trackers[1],JobInProgress.convertTrackerNameToHostName(
trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
TaskTrackerStatus status3 = new TaskTrackerStatus(
trackers[1],JobInProgress.convertTrackerNameToHostName(
trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
tt1.setStatus(status1);
tt2.setStatus(status2);
tt3.setStatus(status3);
FakeJobInProgress fjob = new FakeJobInProgress(conf, jobTracker);
fjob.setClusterSize(3);
fjob.initTasks();
tt1.reserveSlots(TaskType.MAP, fjob, 2);
tt1.reserveSlots(TaskType.REDUCE, fjob, 2);
tt3.reserveSlots(TaskType.MAP, fjob, 2);
tt3.reserveSlots(TaskType.REDUCE, fjob, 2);
assertEquals("Trackers not reserved for the job : maps",
2, fjob.getNumReservedTaskTrackersForMaps());
assertEquals("Trackers not reserved for the job : reduces",
2, fjob.getNumReservedTaskTrackersForReduces());
ClusterMetrics metrics = jobTracker.getClusterMetrics();
assertEquals("reserved map slots do not match",
4, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
4, metrics.getReservedReduceSlots());
TaskAttemptID mTid = fjob.findMapTask(trackers[1]);
TaskAttemptID rTid = fjob.findReduceTask(trackers[1]);
fjob.finishTask(mTid);
fjob.finishTask(rTid);
assertEquals("Job didnt complete successfully complete", fjob.getStatus()
.getRunState(), JobStatus.SUCCEEDED);
assertEquals("Reservation for the job not released: Maps",
0, fjob.getNumReservedTaskTrackersForMaps());
assertEquals("Reservation for the job not released : Reduces",
0, fjob.getNumReservedTaskTrackersForReduces());
metrics = jobTracker.getClusterMetrics();
assertEquals("reserved map slots do not match",
0, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
0, metrics.getReservedReduceSlots());
}
项目:mapreduce-fork
文件:TestLostTracker.java
/**
* Test whether the tracker gets blacklisted after its lost.
*/
public void testLostTrackerBeforeBlacklisting() throws Exception {
FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
TaskAttemptID[] tid = new TaskAttemptID[3];
JobConf conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.set(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, "1");
conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
job.setClusterSize(4);
// Tracker 0 gets the map task
tid[0] = job.findMapTask(trackers[0]);
job.finishTask(tid[0]);
// validate the total tracker count
assertEquals("Active tracker count mismatch",
1, jobTracker.getClusterStatus(false).getTaskTrackers());
// lose the tracker
clock.advance(1100);
jobTracker.checkExpiredTrackers();
assertFalse("Tracker 0 not lost",
jobTracker.getClusterStatus(false).getActiveTrackerNames()
.contains(trackers[0]));
// validate the total tracker count
assertEquals("Active tracker count mismatch",
0, jobTracker.getClusterStatus(false).getTaskTrackers());
// Tracker 1 establishes contact with JT
FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
// Tracker1 should get assigned the lost map task
tid[1] = job.findMapTask(trackers[1]);
assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]);
assertEquals("Task ID of reassigned map task does not match",
tid[0].getTaskID().toString(), tid[1].getTaskID().toString());
// finish the map task
job.finishTask(tid[1]);
// finish the reduce task
tid[2] = job.findReduceTask(trackers[1]);
job.finishTask(tid[2]);
// check if job is successful
assertEquals("Job not successful",
JobStatus.SUCCEEDED, job.getStatus().getRunState());
// check if the tracker is lost
// validate the total tracker count
assertEquals("Active tracker count mismatch",
1, jobTracker.getClusterStatus(false).getTaskTrackers());
// validate blacklisted count .. since we lost one blacklisted tracker
assertEquals("Blacklisted tracker count mismatch",
0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
}
项目:mapreduce-fork
文件:TestLostTracker.java
/**
* Test whether the tracker gets lost after its blacklisted.
*/
public void testLostTrackerAfterBlacklisting() throws Exception {
FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
clock.advance(600);
TaskAttemptID[] tid = new TaskAttemptID[2];
JobConf conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(0);
conf.set(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, "1");
conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
job.setClusterSize(4);
// check if the tracker count is correct
assertEquals("Active tracker count mismatch",
1, jobTracker.taskTrackers().size());
// Tracker 0 gets the map task
tid[0] = job.findMapTask(trackers[0]);
// Fail the task
job.failTask(tid[0]);
// Tracker 1 establishes contact with JT
FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
// check if the tracker count is correct
assertEquals("Active tracker count mismatch",
2, jobTracker.taskTrackers().size());
// Tracker 1 gets the map task
tid[1] = job.findMapTask(trackers[1]);
// Finish the task and also the job
job.finishTask(tid[1]);
// check if job is successful
assertEquals("Job not successful",
JobStatus.SUCCEEDED, job.getStatus().getRunState());
// check if the trackers 1 got blacklisted
assertTrue("Tracker 0 not blacklisted",
jobTracker.getBlacklistedTrackers()[0].getTaskTrackerName()
.equals(trackers[0]));
// check if the tracker count is correct
assertEquals("Active tracker count mismatch",
2, jobTracker.taskTrackers().size());
// validate blacklisted count
assertEquals("Blacklisted tracker count mismatch",
1, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
// Advance clock. Tracker 0 should be lost
clock.advance(500);
jobTracker.checkExpiredTrackers();
// check if the task tracker is lost
assertFalse("Tracker 0 not lost",
jobTracker.getClusterStatus(false).getActiveTrackerNames()
.contains(trackers[0]));
// check if the lost tracker has removed from the jobtracker
assertEquals("Active tracker count mismatch",
1, jobTracker.taskTrackers().size());
// validate blacklisted count
assertEquals("Blacklisted tracker count mismatch",
0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
}
项目:mapreduce-fork
文件:TestSpeculativeExecution.java
public void testRunningTaskCountWithSpeculation() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(3);
conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//Check for runningMap counts first
//schedule maps
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findMapTask(trackers[2]);
clock.advance(5000);
job.finishTask(taskAttemptID[0]);
clock.advance(1000);
job.finishTask(taskAttemptID[1]);
clock.advanceBySpeculativeLag();
//we should get a speculative task now
taskAttemptID[3] = job.findMapTask(trackers[3]);
int oldRunningMap = job.runningMaps();
LOG.info("No of running maps before fail was " + oldRunningMap);
job.failTask(taskAttemptID[2]);
assertEquals(
"Running maps count should be updated from " + oldRunningMap + " to " +
(oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
LOG.info(" Job running maps after fail " + job.runningMaps());
clock.advance(5000);
job.finishTask(taskAttemptID[3]);
//check for runningReduce count.
taskAttemptID[4] = job.findReduceTask(trackers[0]);
taskAttemptID[5] = job.findReduceTask(trackers[1]);
taskAttemptID[6] = job.findReduceTask(trackers[2]);
clock.advance(5000);
job.finishTask(taskAttemptID[4]);
clock.advance(1000);
job.finishTask(taskAttemptID[5]);
clock.advanceBySpeculativeLag();
taskAttemptID[7] = job.findReduceTask(trackers[4]);
int oldRunningReduces = job.runningReduces();
job.failTask(taskAttemptID[6]);
LOG.info(
" No of running Reduces before fail " + oldRunningReduces);
LOG.info(
" No of runing reduces after fail " + job.runningReduces());
assertEquals(
"Running reduces count should be updated from " + oldRunningReduces +
" to " + (oldRunningReduces - 1), job.runningReduces(),
oldRunningReduces - 1);
// Verify total speculative tasks by jobtracker instrumentation
assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
assertEquals("Total speculative reduces", 1,
fakeInst.numSpeculativeReduces);
LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
job.finishTask(taskAttemptID[7]);
}
项目:mapreduce-fork
文件:TestSpeculativeExecution.java
public void testIsSlowTracker() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(10);
conf.setNumReduceTasks(0);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule some tasks
taskAttemptID[0] = job.findMapTask(trackers[0]);
taskAttemptID[1] = job.findMapTask(trackers[0]);
taskAttemptID[2] = job.findMapTask(trackers[0]);
taskAttemptID[3] = job.findMapTask(trackers[1]);
taskAttemptID[4] = job.findMapTask(trackers[1]);
taskAttemptID[5] = job.findMapTask(trackers[1]);
taskAttemptID[6] = job.findMapTask(trackers[2]);
taskAttemptID[7] = job.findMapTask(trackers[2]);
taskAttemptID[8] = job.findMapTask(trackers[2]);
clock.advance(1000);
//Some tasks finish in 1 second (on trackers[0])
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
clock.advance(1000);
//Some tasks finish in 2 second (on trackers[1])
job.finishTask(taskAttemptID[3]);
job.finishTask(taskAttemptID[4]);
job.finishTask(taskAttemptID[5]);
assertEquals("Tracker "+ trackers[0] + " expected to be not slow ",
job.isSlowTracker(trackers[0]), false);
clock.advance(100000);
//After a long time, some tasks finished on trackers[2]
job.finishTask(taskAttemptID[6]);
job.finishTask(taskAttemptID[7]);
job.finishTask(taskAttemptID[8]);
assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
job.isSlowTracker(trackers[2]), true);
// Verify total speculative tasks by jobtracker instrumentation
assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
assertEquals("Total speculative reduces", 1,
fakeInst.numSpeculativeReduces);
LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
项目:mapreduce-fork
文件:TestClusterStatus.java
public void testReservedSlots() throws Exception {
Configuration conf = mr.createJobConf();
conf.setInt(JobContext.NUM_MAPS, 1);
Job job = Job.getInstance(cluster, conf);
job.setNumReduceTasks(1);
job.setSpeculativeExecution(false);
job.setJobSetupCleanupNeeded(false);
//Set task tracker objects for reservation.
TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
TaskTrackerStatus status1 = new TaskTrackerStatus(
trackers[0],JobInProgress.convertTrackerNameToHostName(
trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
TaskTrackerStatus status2 = new TaskTrackerStatus(
trackers[1],JobInProgress.convertTrackerNameToHostName(
trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
tt1.setStatus(status1);
tt2.setStatus(status2);
fakeJob = new FakeJobInProgress(new JobConf(job.getConfiguration()),
jobTracker);
fakeJob.setClusterSize(3);
fakeJob.initTasks();
FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
true, trackers[0], responseId);
FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
true, trackers[1], responseId);
responseId++;
ClusterMetrics metrics = cluster.getClusterStatus();
assertEquals("reserved map slots do not match",
2, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
2, metrics.getReservedReduceSlots());
// redo to test re-reservations.
FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
true, trackers[0], responseId);
FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
true, trackers[1], responseId);
responseId++;
metrics = cluster.getClusterStatus();
assertEquals("reserved map slots do not match",
4, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
4, metrics.getReservedReduceSlots());
TaskAttemptID mTid = fakeJob.findMapTask(trackers[1]);
TaskAttemptID rTid = fakeJob.findReduceTask(trackers[1]);
fakeJob.finishTask(mTid);
fakeJob.finishTask(rTid);
assertEquals("Job didnt complete successfully complete",
fakeJob.getStatus().getRunState(), JobStatus.SUCCEEDED);
metrics = cluster.getClusterStatus();
assertEquals("reserved map slots do not match",
0, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
0, metrics.getReservedReduceSlots());
}