private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) { ApplicationId appId = ApplicationId.newInstance(clusterTimestamp, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); int partitions = 2; Path remoteJobConfFile = mock(Path.class); JobConf conf = new JobConf(); TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class); Token<JobTokenIdentifier> jobToken = (Token<JobTokenIdentifier>) mock(Token.class); Credentials credentials = null; Clock clock = new SystemClock(); int appAttemptId = 3; MRAppMetrics metrics = mock(MRAppMetrics.class); Resource minContainerRequirements = mock(Resource.class); when(minContainerRequirements.getMemory()).thenReturn(1000); ClusterInfo clusterInfo = mock(ClusterInfo.class); AppContext appContext = mock(AppContext.class); when(appContext.getClusterInfo()).thenReturn(clusterInfo); TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions, eh, remoteJobConfFile, conf, taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock, appAttemptId, metrics, appContext); return mapTask; }
private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) { ApplicationId appId = ApplicationId.newInstance(clusterTimestamp, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); int partitions = 2; Path remoteJobConfFile = mock(Path.class); JobConf conf = new JobConf(); TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class); Token<JobTokenIdentifier> jobToken = (Token<JobTokenIdentifier>) mock(Token.class); Credentials credentials = null; Clock clock = new SystemClock(); int appAttemptId = 3; MRAppMetrics metrics = mock(MRAppMetrics.class); Resource minContainerRequirements = mock(Resource.class); when(minContainerRequirements.getMemorySize()).thenReturn(1000L); ClusterInfo clusterInfo = mock(ClusterInfo.class); AppContext appContext = mock(AppContext.class); when(appContext.getClusterInfo()).thenReturn(clusterInfo); TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions, eh, remoteJobConfFile, conf, taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock, appAttemptId, metrics, appContext); return mapTask; }
@Test public void testRecoverySuccessAttempt() { LOG.info("--- START: testRecoverySuccessAttempt ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.SUCCEEDED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.FAILED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED); finalAttemptStates.put(taId2, TaskAttemptState.FAILED); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); jobHistoryEvents.add(EventType.TASK_FINISHED); recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates, arg, jobHistoryEvents, 2L, 1L); }
@Test public void testRecoveryAllFailAttempts() { LOG.info("--- START: testRecoveryAllFailAttempts ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.FAILED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.FAILED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.FAILED); finalAttemptStates.put(taId2, TaskAttemptState.FAILED); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); jobHistoryEvents.add(EventType.TASK_FAILED); recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates, arg, jobHistoryEvents, 2L, 2L); }
@Test public void testRecoveryTaskSuccessAllAttemptsFail() { LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.FAILED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.FAILED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.FAILED); finalAttemptStates.put(taId2, TaskAttemptState.FAILED); // check for one new attempt launched since successful attempt not found TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000); finalAttemptStates.put(taId3, TaskAttemptState.NEW); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates, arg, jobHistoryEvents, 2L, 2L); }
@Test public void testRecoveryTaskSuccessAllAttemptsSucceed() { LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.SUCCEEDED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.SUCCEEDED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED); finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); jobHistoryEvents.add(EventType.TASK_FINISHED); recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates, arg, jobHistoryEvents, 2L, 0L); }
@Test public void testRecoveryAllAttemptsKilled() { LOG.info("--- START: testRecoveryAllAttemptsKilled ---"); long clusterTimestamp = System.currentTimeMillis(); EventHandler mockEventHandler = mock(EventHandler.class); MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, mockEventHandler); TaskId taskId = recoverMapTask.getID(); JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); TaskID taskID = new TaskID(jobID, org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); //Mock up the TaskAttempts Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts = new HashMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, TaskAttemptState.KILLED); mockTaskAttempts.put(taId1, mockTAinfo1); TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, TaskAttemptState.KILLED); mockTaskAttempts.put(taId2, mockTAinfo2); OutputCommitter mockCommitter = mock (OutputCommitter.class); TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED"); when(mockTaskInfo.getTaskId()).thenReturn(taskID); when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); recoverMapTask.handle( new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(mockEventHandler,atLeast(1)).handle( (org.apache.hadoop.yarn.event.Event) arg.capture()); Map<TaskAttemptID, TaskAttemptState> finalAttemptStates = new HashMap<TaskAttemptID, TaskAttemptState>(); finalAttemptStates.put(taId1, TaskAttemptState.KILLED); finalAttemptStates.put(taId2, TaskAttemptState.KILLED); List<EventType> jobHistoryEvents = new ArrayList<EventType>(); jobHistoryEvents.add(EventType.TASK_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED); jobHistoryEvents.add(EventType.TASK_FAILED); recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates, arg, jobHistoryEvents, 2L, 0L); }
private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState, Map<TaskAttemptID, TaskAttemptState> finalAttemptStates, ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents, long expectedMapLaunches, long expectedFailedMaps) { assertEquals("Final State of Task", finalState, checkTask.getState()); Map<TaskAttemptId, TaskAttempt> recoveredAttempts = checkTask.getAttempts(); assertEquals("Expected Number of Task Attempts", finalAttemptStates.size(), recoveredAttempts.size()); for (TaskAttemptID taID : finalAttemptStates.keySet()) { assertEquals("Expected Task Attempt State", finalAttemptStates.get(taID), recoveredAttempts.get(TypeConverter.toYarn(taID)).getState()); } Iterator<Event> ie = arg.getAllValues().iterator(); int eventNum = 0; long totalLaunchedMaps = 0; long totalFailedMaps = 0; boolean jobTaskEventReceived = false; while (ie.hasNext()) { Object current = ie.next(); ++eventNum; LOG.info(eventNum + " " + current.getClass().getName()); if (current instanceof JobHistoryEvent) { JobHistoryEvent jhe = (JobHistoryEvent) current; LOG.info(expectedJobHistoryEvents.get(0).toString() + " " + jhe.getHistoryEvent().getEventType().toString() + " " + jhe.getJobID()); assertEquals(expectedJobHistoryEvents.get(0), jhe.getHistoryEvent().getEventType()); expectedJobHistoryEvents.remove(0); } else if (current instanceof JobCounterUpdateEvent) { JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current; LOG.info("JobCounterUpdateEvent " + jcue.getCounterUpdates().get(0).getCounterKey() + " " + jcue.getCounterUpdates().get(0).getIncrementValue()); if (jcue.getCounterUpdates().get(0).getCounterKey() == JobCounter.NUM_FAILED_MAPS) { totalFailedMaps += jcue.getCounterUpdates().get(0) .getIncrementValue(); } else if (jcue.getCounterUpdates().get(0).getCounterKey() == JobCounter.TOTAL_LAUNCHED_MAPS) { totalLaunchedMaps += jcue.getCounterUpdates().get(0) .getIncrementValue(); } } else if (current instanceof JobTaskEvent) { JobTaskEvent jte = (JobTaskEvent) current; assertEquals(jte.getState(), finalState); jobTaskEventReceived = true; } } assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING)); assertEquals("Did not process all expected JobHistoryEvents", 0, expectedJobHistoryEvents.size()); assertEquals("Expected Map Launches", expectedMapLaunches, totalLaunchedMaps); assertEquals("Expected Failed Maps", expectedFailedMaps, totalFailedMaps); }