private Job loadJob(HistoryFileInfo fileInfo) { try { Job job = fileInfo.loadJob(); if (LOG.isDebugEnabled()) { LOG.debug("Adding " + job.getID() + " to loaded job cache"); } // We can clobber results here, but that should be OK, because it only // means that we may have two identical copies of the same job floating // around for a while. loadedJobCache.put(job.getID(), job); return job; } catch (IOException e) { throw new YarnRuntimeException( "Could not find/load job: " + fileInfo.getJobId(), e); } }
@Override public Job getFullJob(JobId jobId) { if (LOG.isDebugEnabled()) { LOG.debug("Looking for Job " + jobId); } try { HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId); Job result = null; if (fileInfo != null) { result = loadedJobCache.get(jobId); if (result == null) { result = loadJob(fileInfo); } else if(fileInfo.isDeleted()) { loadedJobCache.remove(jobId); result = null; } } else { loadedJobCache.remove(jobId); } return result; } catch (IOException e) { throw new YarnRuntimeException(e); } }
@Override public Map<JobId, Job> getAllPartialJobs() { LOG.debug("Called getAllPartialJobs()"); SortedMap<JobId, Job> result = new TreeMap<JobId, Job>(); try { for (HistoryFileInfo mi : hsManager.getAllFileInfo()) { if (mi != null) { JobId id = mi.getJobId(); result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); } } } catch (IOException e) { LOG.warn("Error trying to scan for all FileInfos", e); throw new YarnRuntimeException(e); } return result; }
@Test (timeout=100000) public void testCompletedJob() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); //Re-initialize to verify the delayed load. completedJob = new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user", info, jobAclsManager); //Verify tasks loaded based on loadTask parameter. assertEquals(loadTasks, completedJob.tasksLoaded.get()); assertEquals(1, completedJob.getAMInfos().size()); assertEquals(10, completedJob.getCompletedMaps()); assertEquals(1, completedJob.getCompletedReduces()); assertEquals(12, completedJob.getTasks().size()); //Verify tasks loaded at this point. assertEquals(true, completedJob.tasksLoaded.get()); assertEquals(10, completedJob.getTasks(TaskType.MAP).size()); assertEquals(2, completedJob.getTasks(TaskType.REDUCE).size()); assertEquals("user", completedJob.getUserName()); assertEquals(JobState.SUCCEEDED, completedJob.getState()); JobReport jobReport = completedJob.getReport(); assertEquals("user", jobReport.getUser()); assertEquals(JobState.SUCCEEDED, jobReport.getJobState()); }
@Test public void testHistoryFileInfoSummaryFileNotExist() throws Exception { HistoryFileManagerTest hmTest = new HistoryFileManagerTest(); String job = "job_1410889000000_123456"; Path summaryFile = new Path(job + ".summary"); JobIndexInfo jobIndexInfo = new JobIndexInfo(); jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(job))); Configuration conf = dfsCluster.getConfiguration(0); conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID()); conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID()); hmTest.serviceInit(conf); HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null, summaryFile, jobIndexInfo, false); info.moveToDone(); Assert.assertFalse(info.didMoveFail()); }
@Test (timeout=100000) public void testCompletedJob() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); //Re-initialize to verify the delayed load. completedJob = new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", info, jobAclsManager); //Verify tasks loaded based on loadTask parameter. assertEquals(loadTasks, completedJob.tasksLoaded.get()); assertEquals(1, completedJob.getAMInfos().size()); assertEquals(10, completedJob.getCompletedMaps()); assertEquals(1, completedJob.getCompletedReduces()); assertEquals(12, completedJob.getTasks().size()); //Verify tasks loaded at this point. assertEquals(true, completedJob.tasksLoaded.get()); assertEquals(10, completedJob.getTasks(TaskType.MAP).size()); assertEquals(2, completedJob.getTasks(TaskType.REDUCE).size()); assertEquals("user", completedJob.getUserName()); assertEquals(JobState.SUCCEEDED, completedJob.getState()); JobReport jobReport = completedJob.getReport(); assertEquals("user", jobReport.getUser()); assertEquals(JobState.SUCCEEDED, jobReport.getJobState()); }
@Test (timeout=30000) public void testCompletedJobWithDiagnostics() throws Exception { final String jobError = "Job Diagnostics"; JobInfo jobInfo = spy(new JobInfo()); when(jobInfo.getErrorInfo()).thenReturn(jobError); when(jobInfo.getJobStatus()).thenReturn(JobState.FAILED.toString()); when(jobInfo.getAMInfos()).thenReturn(Collections.<JobHistoryParser.AMInfo>emptyList()); final JobHistoryParser mockParser = mock(JobHistoryParser.class); when(mockParser.parse()).thenReturn(jobInfo); HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); when(info.getHistoryFile()).thenReturn(fullHistoryPath); CompletedJob job = new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user", info, jobAclsManager) { @Override protected JobHistoryParser createJobHistoryParser( Path historyFileAbsolute) throws IOException { return mockParser; } }; assertEquals(jobError, job.getReport().getDiagnostics()); }