private void addDirectoryToJobListCache(Path path) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Adding " + path + " to job list cache."); } List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, doneDirFc); for (FileStatus fs : historyFileList) { if (LOG.isDebugEnabled()) { LOG.debug("Adding in history for " + fs.getPath()); } JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs .getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); jobListCache.addIfAbsent(fileInfo); } }
/** * Searches the job history file FileStatus list for the specified JobId. * * @param fileStatusList * fileStatus list of Job History Files. * @param jobId * The JobId to find. * @return A FileInfo object for the jobId, null if not found. * @throws IOException */ private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException { for (FileStatus fs : fileStatusList) { JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); if (jobIndexInfo.getJobId().equals(jobId)) { String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path( fs.getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); return fileInfo; } } return null; }
private void addDirectoryToJobListCache(Path path) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Adding " + path + " to job list cache."); } List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, doneDirFc); for (FileStatus fs : historyFileList) { if (LOG.isDebugEnabled()) { LOG.debug("Adding in history for " + fs.getPath()); } JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs .getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); jobListCache.addIfAbsent(fileInfo); } }
/** * Searches the job history file FileStatus list for the specified JobId. * * @param fileStatusList * fileStatus list of Job History Files. * @param jobId * The JobId to find. * @return A FileInfo object for the jobId, null if not found. * @throws IOException */ private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException { for (FileStatus fs : fileStatusList) { JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); if (jobIndexInfo.getJobId().equals(jobId)) { String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( fs.getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); return fileInfo; } } return null; }
/** * Clean up older history files. * * @throws IOException * on any error trying to remove the entries. */ @SuppressWarnings("unchecked") void clean() throws IOException { long cutoff = System.currentTimeMillis() - maxHistoryAge; boolean halted = false; List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); // Sort in ascending order. Relies on YYYY/MM/DD/Serial Collections.sort(serialDirList); for (FileStatus serialDir : serialDirList) { List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( serialDir.getPath(), doneDirFc); for (FileStatus historyFile : historyFileList) { JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile .getPath().getName()); long effectiveTimestamp = getEffectiveTimestamp( jobIndexInfo.getFinishTime(), historyFile); if (effectiveTimestamp <= cutoff) { HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo .getJobId()); if (fileInfo == null) { String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path( historyFile.getPath().getParent(), confFileName), null, jobIndexInfo, true); } deleteJobFromDone(fileInfo); } else { halted = true; break; } } if (!halted) { deleteDir(serialDir); removeDirectoryFromSerialNumberIndex(serialDir.getPath()); existingDoneSubdirs.remove(serialDir.getPath()); } else { break; // Don't scan any more directories. } } }
/** * Clean up older history files. * * @throws IOException * on any error trying to remove the entries. */ @SuppressWarnings("unchecked") void clean() throws IOException { long cutoff = System.currentTimeMillis() - maxHistoryAge; boolean halted = false; List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); // Sort in ascending order. Relies on YYYY/MM/DD/Serial Collections.sort(serialDirList); for (FileStatus serialDir : serialDirList) { List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( serialDir.getPath(), doneDirFc); for (FileStatus historyFile : historyFileList) { JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile .getPath().getName()); long effectiveTimestamp = getEffectiveTimestamp( jobIndexInfo.getFinishTime(), historyFile); if (effectiveTimestamp <= cutoff) { HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo .getJobId()); if (fileInfo == null) { String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( historyFile.getPath().getParent(), confFileName), null, jobIndexInfo, true); } deleteJobFromDone(fileInfo); } else { halted = true; break; } } if (!halted) { deleteDir(serialDir); removeDirectoryFromSerialNumberIndex(serialDir.getPath()); existingDoneSubdirs.remove(serialDir.getPath()); } else { break; // Don't scan any more directories. } } }
protected void processDoneFiles(JobId jobId) throws IOException { final MetaInfo mi = fileMap.get(jobId); if (mi == null) { throw new IOException("No MetaInfo found for JobId: [" + jobId + "]"); } if (mi.getHistoryFile() == null) { LOG.warn("No file for job-history with " + jobId + " found in cache!"); } if (mi.getConfFile() == null) { LOG.warn("No file for jobconf with " + jobId + " found in cache!"); } Path qualifiedSummaryDoneFile = writeSummaryFile(jobId, mi.getJobSummary()); try { // Move historyFile to Done Folder. Path qualifiedDoneFile = null; if (mi.getHistoryFile() != null) { Path historyFile = mi.getHistoryFile(); Path qualifiedLogFile = stagingDirFS.makeQualified(historyFile); String doneJobHistoryFileName = getTempFileName(FileNameIndexUtils.getDoneFileName(mi .getJobIndexInfo())); qualifiedDoneFile = doneDirFS.makeQualified(new Path(doneDirPrefixPath, doneJobHistoryFileName)); moveToDoneNow(qualifiedLogFile, qualifiedDoneFile); } // Move confFile to Done Folder Path qualifiedConfDoneFile = null; if (mi.getConfFile() != null) { Path confFile = mi.getConfFile(); Path qualifiedConfFile = stagingDirFS.makeQualified(confFile); String doneConfFileName = getTempFileName(JobHistoryUtils .getIntermediateConfFileName(jobId)); qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(doneDirPrefixPath, doneConfFileName)); moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile); } moveTmpToDone(qualifiedSummaryDoneFile); moveTmpToDone(qualifiedConfDoneFile); moveTmpToDone(qualifiedDoneFile); } catch (IOException e) { LOG.error("Error closing writer for JobID: " + jobId); throw e; } }
/** * Clean up older history files. * * @throws IOException * on any error trying to remove the entries. */ @SuppressWarnings("unchecked") void clean() throws IOException { // TODO this should be replaced by something that knows about the directory // structure and will put less of a load on HDFS. long cutoff = System.currentTimeMillis() - maxHistoryAge; boolean halted = false; // TODO Delete YYYY/MM/DD directories. List<FileStatus> serialDirList = findTimestampedDirectories(); // Sort in ascending order. Relies on YYYY/MM/DD/Serial Collections.sort(serialDirList); for (FileStatus serialDir : serialDirList) { List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( serialDir.getPath(), doneDirFc); for (FileStatus historyFile : historyFileList) { JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile .getPath().getName()); long effectiveTimestamp = getEffectiveTimestamp( jobIndexInfo.getFinishTime(), historyFile); if (effectiveTimestamp <= cutoff) { HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo .getJobId()); if (fileInfo == null) { String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( historyFile.getPath().getParent(), confFileName), null, jobIndexInfo, true); } deleteJobFromDone(fileInfo); } else { halted = true; break; } } if (!halted) { doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); removeDirectoryFromSerialNumberIndex(serialDir.getPath()); existingDoneSubdirs.remove(serialDir.getPath()); } else { break; // Don't scan any more directories. } } }
@Test(timeout = 30000) public void testHistoryParsingForFailedAttempts() throws Exception { LOG.info("STARTING testHistoryParsingForFailedAttempts"); try { Configuration conf = new Configuration(); conf.setClass( CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this .getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); app.waitForState(job, JobState.SUCCEEDED); // make sure all events are flushed app.waitForState(Service.STATE.STOPPED); String jobhistoryDir = JobHistoryUtils .getHistoryIntermediateDoneDirForUser(conf); JobHistory jobHistory = new JobHistory(); jobHistory.init(conf); JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) .getJobIndexInfo(); String jobhistoryFileName = FileNameIndexUtils .getDoneFileName(jobIndexInfo); Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); FSDataInputStream in = null; FileContext fc = null; try { fc = FileContext.getFileContext(conf); in = fc.open(fc.makeQualified(historyFilePath)); } catch (IOException ioe) { LOG.info("Can not open history file: " + historyFilePath, ioe); throw (new Exception("Can not open History File")); } JobHistoryParser parser = new JobHistoryParser(in); JobInfo jobInfo = parser.parse(); Exception parseException = parser.getParseException(); Assert.assertNull("Caught an expected exception " + parseException, parseException); int noOffailedAttempts = 0; Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); for (Task task : job.getTasks().values()) { TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); for (TaskAttempt taskAttempt : task.getAttempts().values()) { TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( TypeConverter.fromYarn((taskAttempt.getID()))); // Verify rack-name for all task attempts Assert.assertEquals("rack-name is incorrect", taskAttemptInfo.getRackname(), RACK_NAME); if (taskAttemptInfo.getTaskStatus().equals("FAILED")) { noOffailedAttempts++; } } } Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts); } finally { LOG.info("FINISHED testHistoryParsingForFailedAttempts"); } }
@Test(timeout = 60000) public void testCountersForFailedTask() throws Exception { LOG.info("STARTING testCountersForFailedTask"); try { Configuration conf = new Configuration(); conf.setClass( CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this .getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); app.waitForState(job, JobState.FAILED); // make sure all events are flushed app.waitForState(Service.STATE.STOPPED); String jobhistoryDir = JobHistoryUtils .getHistoryIntermediateDoneDirForUser(conf); JobHistory jobHistory = new JobHistory(); jobHistory.init(conf); JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) .getJobIndexInfo(); String jobhistoryFileName = FileNameIndexUtils .getDoneFileName(jobIndexInfo); Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); FSDataInputStream in = null; FileContext fc = null; try { fc = FileContext.getFileContext(conf); in = fc.open(fc.makeQualified(historyFilePath)); } catch (IOException ioe) { LOG.info("Can not open history file: " + historyFilePath, ioe); throw (new Exception("Can not open History File")); } JobHistoryParser parser = new JobHistoryParser(in); JobInfo jobInfo = parser.parse(); Exception parseException = parser.getParseException(); Assert.assertNull("Caught an expected exception " + parseException, parseException); for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) { TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); Assert.assertNotNull("completed task report has null counters", ct .getReport().getCounters()); } } finally { LOG.info("FINISHED testCountersForFailedTask"); } }