/** * Update resource information counters */ void updateResourceCounters() { // Update generic resource counters updateHeapUsageCounter(); if (resourceCalculator == null) { return; } ProcResourceValues res = resourceCalculator.getProcResourceValues(); long cpuTime = res.getCumulativeCpuTime(); long pMem = res.getPhysicalMemorySize(); long vMem = res.getVirtualMemorySize(); // Remove the CPU time consumed previously by JVM reuse cpuTime -= initCpuCumulativeTime; counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime); counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem); counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem); }
protected ProcResourceValues sortReduceParts() { long sortStartMilli = System.currentTimeMillis(); ProcResourceValues sortStartProcVals = task.getCurrentProcResourceValues(); long sortStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); // sort for (int i = 0; i < reducePartitions.length; i++) { reducePartitions[i].groupOrSort(); } long sortEndMilli = System.currentTimeMillis(); ProcResourceValues sortEndProcVals = task.getCurrentProcResourceValues(); long sortEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); mapSpillSortCounter.incCountersPerSort(sortStartProcVals, sortEndProcVals, sortEndMilli - sortStartMilli); mapSpillSortCounter.incJVMCPUPerSort(sortStart, sortEnd); return sortEndProcVals; }
public synchronized void flush() throws IOException, ClassNotFoundException, InterruptedException { if (numSpills > 0 && lastSpillInMem) { // if there is already one spills, we can try to hold this last spill in // memory. sortReduceParts(); for (int i = 0; i < partitions; i++) { this.inMemorySegments[i] = new Segment<K, V>(this.reducePartitions[i].getIReader(), true); } hasInMemorySpill=true; } else { sortAndSpill(); } long mergeStartMilli = System.currentTimeMillis(); ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues(); long mergeStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); mergeParts(); long mergeEndMilli = System.currentTimeMillis(); ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues(); long mergeEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals, mergeEndMilli - mergeStartMilli); mapSpillSortCounter.incJVMCPUMerge(mergeStart, mergeEnd); }
public synchronized void flush() throws IOException, ClassNotFoundException, InterruptedException { if (numSpills > 0 && lastSpillInMem) { // if there is already one spills, we can try to hold this last spill in // memory. sortReduceParts(); for (int i = 0; i < partitions; i++) { this.inMemorySegments[i] = new Segment<K, V>(this.reducePartitions[i].getIReader(), true); } hasInMemorySpill=true; } else { sortAndSpill(); } long mergeStartMilli = System.currentTimeMillis(); ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues(); mergeParts(); long mergeEndMilli = System.currentTimeMillis(); ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues(); mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals, mergeEndMilli - mergeStartMilli); }
public void incCountersPerSpill(ProcResourceValues spillStartProcVals, ProcResourceValues spillEndProcVals, long wallClockVal, long spillBytesVal) { numSpillsVal += 1; long cpuUsedBySpill = getCPUVal(spillStartProcVals, spillEndProcVals); mapSpillCPUVal += cpuUsedBySpill; mapSpillWallClockVal += wallClockVal; mapSpillBytesVal += spillBytesVal; }
public void incMergeCounters(ProcResourceValues mergeStartProcVals, ProcResourceValues mergeEndProcVals, long wallClockVal) { long cpuUsedByMerge = this .getCPUVal(mergeStartProcVals, mergeEndProcVals); mapMergeCPUVal += cpuUsedByMerge; this.mapMergeWallClockVal += wallClockVal; }
private long getCPUVal(ProcResourceValues startProcVals, ProcResourceValues endProcVals) { long cpuUsed = 0; if (startProcVals != null && endProcVals != null) { long cpuStartVal = startProcVals.getCumulativeCpuTime(); long cpuEndVal = endProcVals.getCumulativeCpuTime(); if (cpuEndVal > cpuStartVal) { cpuUsed = cpuEndVal - cpuStartVal; } } return cpuUsed; }
private void setCPUCounter(ProcResourceValues startProcVals, ProcResourceValues endProcVals, org.apache.hadoop.mapred.Counters.Counter counter) { long cpuUsed = 0; if (startProcVals != null && endProcVals != null) { long cpuStartVal = startProcVals.getCumulativeCpuTime(); long cpuEndVal = endProcVals.getCumulativeCpuTime(); if (cpuEndVal > cpuStartVal) { cpuUsed = cpuEndVal - cpuStartVal; } } counter.setValue(cpuUsed); }
protected ProcResourceValues sortReduceParts() { long sortStartMilli = System.currentTimeMillis(); ProcResourceValues sortStartProcVals = task.getCurrentProcResourceValues(); // sort for (int i = 0; i < reducePartitions.length; i++) { reducePartitions[i].groupOrSort(); } long sortEndMilli = System.currentTimeMillis(); ProcResourceValues sortEndProcVals = task.getCurrentProcResourceValues(); mapSpillSortCounter.incCountersPerSort(sortStartProcVals, sortEndProcVals, sortEndMilli - sortStartMilli); return sortEndProcVals; }
public void incCountersPerSort(ProcResourceValues sortStartProcVals, ProcResourceValues sortEndProcVals, long wallClockVal) { long cpuUsedBySort = getCPUVal(sortStartProcVals, sortEndProcVals); mapMemSortCPUVal += cpuUsedBySort; mapMemSortWallClockVal += wallClockVal; }
/** * Handles the degenerate case where serialization fails to fit in * the in-memory buffer, so we must spill the record from collect * directly to a spill file. Consider this "losing". */ private void spillSingleRecord(final K key, final V value, int partition) throws IOException { long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { long spillStartMilli = System.currentTimeMillis(); ProcResourceValues spillStartProcVals = getCurrentProcResourceValues(); long spillStart = jmxThreadInfoTracker.getCurrentThreadCPUTime(); long spillBytes = 0; // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); out = rfs.create(filename); // we don't run the combiner for a single record IndexRecord rec = new IndexRecord(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); // Create a new codec, don't care! writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (i == partition) { final long recordStart = out.getPos(); writer.append(key, value); // Note that our map byte count will not be accurate with // compression mapOutputByteCounter.increment(out.getPos() - recordStart); } writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillBytes += writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } catch (IOException e) { if (null != writer) writer.close(); throw e; } } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } long spillEndMilli = System.currentTimeMillis(); ProcResourceValues spillEndProcVals = getCurrentProcResourceValues(); long spillEnd = jmxThreadInfoTracker.getCurrentThreadCPUTime(); spillSortCounters.incCountersPerSpill(spillStartProcVals, spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes); spillSortCounters.incJVMCPUPerSpill(spillStart, spillEnd); ++numSpills; } finally { if (out != null) out.close(); } }
@Override public void sortAndSpill() throws IOException { ProcResourceValues sortEndProcVals = sortReduceParts(); long sortEndMilli = System.currentTimeMillis(); long spillStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); // spill FSDataOutputStream out = null; long spillBytes = 0; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = task.mapOutputFile .getSpillFileForWrite(getTaskID(), numSpills, this.memoryBlockAllocator.getEstimatedSize()); out = rfs.create(filename); for (int i = 0; i < partitions; ++i) { IndexRecord rec = reducePartitions[i].spill(job, out, keyClass, valClass, codec, task.spilledRecordsCounter); // record offsets spillBytes += rec.partLength; spillRec.putIndex(rec, i); } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(), numSpills, partitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } long spillEndMilli = System.currentTimeMillis(); ProcResourceValues spillEndProcVals = task.getCurrentProcResourceValues(); long spillEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); mapSpillSortCounter.incCountersPerSpill(sortEndProcVals, spillEndProcVals, spillEndMilli - sortEndMilli, spillBytes); mapSpillSortCounter.incJVMCPUPerSpill(spillStart, spillEnd); }
public void spillSingleRecord(K key, V value, int part) throws IOException { ProcResourceValues spillStartProcVals = task.getCurrentProcResourceValues(); long spillStartMilli = System.currentTimeMillis(); long spillStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); // spill FSDataOutputStream out = null; long spillBytes = 0; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = task.mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, key.getLength() + value.getLength()); out = rfs.create(filename); IndexRecord rec = new IndexRecord(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); // Create a new codec, don't care! writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec, task.spilledRecordsCounter); if (i == part) { final long recordStart = out.getPos(); writer.append(key, value); // Note that our map byte count will not be accurate with // compression mapOutputByteCounter .increment(out.getPos() - recordStart); } writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillBytes += writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } catch (IOException e) { if (null != writer) writer.close(); throw e; } } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(), numSpills, partitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill big record " + numBigRecordsSpills); ++numBigRecordsSpills; ++numSpills; } finally { if (out != null) out.close(); } long spillEndMilli = System.currentTimeMillis(); ProcResourceValues spillEndProcVals = task.getCurrentProcResourceValues(); mapSpillSortCounter.incCountersPerSpill(spillStartProcVals, spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes); long spillEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); mapSpillSortCounter.incJVMCPUPerSpill(spillStart, spillEnd); mapSpillSortCounter.incSpillSingleRecord(); }
/** * Handles the degenerate case where serialization fails to fit in * the in-memory buffer, so we must spill the record from collect * directly to a spill file. Consider this "losing". */ private void spillSingleRecord(final K key, final V value, int partition) throws IOException { long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { long spillStartMilli = System.currentTimeMillis(); ProcResourceValues spillStartProcVals = getCurrentProcResourceValues(); long spillBytes = 0; // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); out = rfs.create(filename); // we don't run the combiner for a single record IndexRecord rec = new IndexRecord(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); // Create a new codec, don't care! writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (i == partition) { final long recordStart = out.getPos(); writer.append(key, value); // Note that our map byte count will not be accurate with // compression mapOutputByteCounter.increment(out.getPos() - recordStart); } writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillBytes += writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } catch (IOException e) { if (null != writer) writer.close(); throw e; } } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } long spillEndMilli = System.currentTimeMillis(); ProcResourceValues spillEndProcVals = getCurrentProcResourceValues(); spillSortCounters.incCountersPerSpill(spillStartProcVals, spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes); ++numSpills; } finally { if (out != null) out.close(); } }
@Override public void sortAndSpill() throws IOException { ProcResourceValues sortEndProcVals = sortReduceParts(); long sortEndMilli = System.currentTimeMillis(); // spill FSDataOutputStream out = null; long spillBytes = 0; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = task.mapOutputFile .getSpillFileForWrite(getTaskID(), numSpills, this.memoryBlockAllocator.getEstimatedSize()); out = rfs.create(filename); for (int i = 0; i < partitions; ++i) { IndexRecord rec = reducePartitions[i].spill(job, out, keyClass, valClass, codec, task.spilledRecordsCounter); // record offsets spillBytes += rec.partLength; spillRec.putIndex(rec, i); } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(), numSpills, partitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } long spillEndMilli = System.currentTimeMillis(); ProcResourceValues spillEndProcVals = task.getCurrentProcResourceValues(); mapSpillSortCounter.incCountersPerSpill(sortEndProcVals, spillEndProcVals, spillEndMilli - sortEndMilli, spillBytes); }
public void spillSingleRecord(K key, V value, int part) throws IOException { ProcResourceValues spillStartProcVals = task.getCurrentProcResourceValues(); long spillStartMilli = System.currentTimeMillis(); // spill FSDataOutputStream out = null; long spillBytes = 0; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = task.mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, key.getLength() + value.getLength()); out = rfs.create(filename); IndexRecord rec = new IndexRecord(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); // Create a new codec, don't care! writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec, task.spilledRecordsCounter); if (i == part) { final long recordStart = out.getPos(); writer.append(key, value); // Note that our map byte count will not be accurate with // compression mapOutputByteCounter .increment(out.getPos() - recordStart); } writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillBytes += writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } catch (IOException e) { if (null != writer) writer.close(); throw e; } } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(), numSpills, partitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill big record " + numBigRecordsSpills); ++numBigRecordsSpills; ++numSpills; } finally { if (out != null) out.close(); } long spillEndMilli = System.currentTimeMillis(); ProcResourceValues spillEndProcVals = task.getCurrentProcResourceValues(); mapSpillSortCounter.incCountersPerSpill(spillStartProcVals, spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes); mapSpillSortCounter.incSpillSingleRecord(); }