Java 类org.apache.hadoop.mapred.Merger 实例源码
项目:lustre-connector-for-hadoop
文件:LustreFsShuffle.java
@SuppressWarnings("unchecked")
public RawKeyValueIterator finish() throws Throwable {
// merge config params
Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass();
final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator();
// Wait for on-going merges to complete
merger.close();
LOG.info("finalMerge called with " + segmentsToBeMerged.size() + " on-disk map-outputs");
List<Segment<K, V>> segments = new ArrayList<Segment<K, V>>();
long onDiskBytes = 0;
for (Segment<K, V> segment : segmentsToBeMerged) {
long fileLength = segment.getLength();
onDiskBytes += fileLength;
LOG.debug("Disk file: " + segment + " Length is " + fileLength);
segments.add(segment);
}
segmentsToBeMerged.clear();
LOG.info("Merging " + segmentsToBeMerged.size() + " files, " + onDiskBytes + " bytes from disk");
Collections.sort(segments, new Comparator<Segment<K, V>>() {
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}
return o1.getLength() < o2.getLength() ? -1 : 1;
}
});
return Merger.merge(jobConf, lustrefs, keyClass, valueClass, segments, segments.size(), mergeTempDir,
comparator, reporter, spilledRecordsCounter, null, null);
}
项目:hadoop
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:aliyun-oss-hadoop-fs
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:big-c
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hadoop-plus
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hadoop-plus
文件:TestMerger.java
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
}
项目:FlexMap
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hops
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hadoop-TCP
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hadoop-TCP
文件:TestMerger.java
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
}
项目:hardfs
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hardfs
文件:TestMerger.java
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
}
项目:hadoop-on-lustre2
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hadoop-on-lustre2
文件:TestMerger.java
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
}
项目:mapreduce-fork
文件:MergeManager.java
@Override
public void merge(List<MapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
MapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
项目:lustre-connector-for-hadoop
文件:LustreFsShuffle.java
@SuppressWarnings("unchecked")
@Override
public void merge(List<Segment<K,V>> segments) throws IOException {
// sanity check
if (segments == null || segments.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass();
final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator();
long approxOutputSize = 0;
int bytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + segments.size()
+ " map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (Segment<K,V> segment : segments) {
approxOutputSize += segment.getLength();
}
// add the checksum length
approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath = new Path(reduceDir, "file-" + (numPasses++)).suffix(Task.MERGED_OUTPUT_PREFIX);
Writer<K, V> writer = new Writer<K, V>(jobConf, lustrefs.create(outputPath),
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null, true);
RawKeyValueIterator iter = null;
try {
iter = Merger.merge(jobConf, lustrefs, keyClass, valueClass, segments, ioSortFactor, mergeTempDir,
comparator, reporter, spilledRecordsCounter, mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
} catch (IOException e) {
lustrefs.delete(outputPath, true);
throw e;
}
addSegmentToMerge(new Segment<K, V>(jobConf, lustrefs, outputPath, codec, false, null));
LOG.info(reduceId + " Finished merging " + segments.size()
+ " map output files on disk of total-size " + approxOutputSize + "."
+ " Local output file is " + outputPath + " of size "
+ lustrefs.getFileStatus(outputPath).getLen());
}
项目:hadoop
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
项目:hadoop
文件:TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
// Reading 6 keys total, 3 each in 2 segments, so each key read moves the
// progress forward 1/6th of the way. Initially the first keys from each
// segment have been read as part of the merge setup, so progress = 2/6.
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// The first next() returns one of the keys already read during merge setup
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// At this point we've exhausted all of the keys in one segment
// so getting the next key will return the already cached key from the
// other segment
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
// Now there should be no more input
Assert.assertFalse(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.getKey() == null);
Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
项目:aliyun-oss-hadoop-fs
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
项目:aliyun-oss-hadoop-fs
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
项目:aliyun-oss-hadoop-fs
文件:TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
// Reading 6 keys total, 3 each in 2 segments, so each key read moves the
// progress forward 1/6th of the way. Initially the first keys from each
// segment have been read as part of the merge setup, so progress = 2/6.
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// The first next() returns one of the keys already read during merge setup
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// At this point we've exhausted all of the keys in one segment
// so getting the next key will return the already cached key from the
// other segment
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
// Now there should be no more input
Assert.assertFalse(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.getKey() == null);
Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
项目:big-c
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
项目:big-c
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
项目:big-c
文件:TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
// Reading 6 keys total, 3 each in 2 segments, so each key read moves the
// progress forward 1/6th of the way. Initially the first keys from each
// segment have been read as part of the merge setup, so progress = 2/6.
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// The first next() returns one of the keys already read during merge setup
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// At this point we've exhausted all of the keys in one segment
// so getting the next key will return the already cached key from the
// other segment
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
// Now there should be no more input
Assert.assertFalse(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.getKey() == null);
Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
// Reading 6 keys total, 3 each in 2 segments, so each key read moves the
// progress forward 1/6th of the way. Initially the first keys from each
// segment have been read as part of the merge setup, so progress = 2/6.
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// The first next() returns one of the keys already read during merge setup
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// At this point we've exhausted all of the keys in one segment
// so getting the next key will return the already cached key from the
// other segment
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
// Now there should be no more input
Assert.assertFalse(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.getKey() == null);
Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
项目:hadoop-plus
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
Writer<K,V> writer =
new Writer<K,V>(jobConf, rfs, outputPath,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
项目:hadoop-plus
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
Writer<K,V> writer =
new Writer<K,V>(jobConf, rfs, outputPath,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
项目:FlexMap
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
项目:FlexMap
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
项目:FlexMap
文件:TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
// Reading 6 keys total, 3 each in 2 segments, so each key read moves the
// progress forward 1/6th of the way. Initially the first keys from each
// segment have been read as part of the merge setup, so progress = 2/6.
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// The first next() returns one of the keys already read during merge setup
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// At this point we've exhausted all of the keys in one segment
// so getting the next key will return the already cached key from the
// other segment
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
// Now there should be no more input
Assert.assertFalse(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
}
项目:hops
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
项目:hops
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
项目:hops
文件:TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
// Reading 6 keys total, 3 each in 2 segments, so each key read moves the
// progress forward 1/6th of the way. Initially the first keys from each
// segment have been read as part of the merge setup, so progress = 2/6.
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// The first next() returns one of the keys already read during merge setup
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// At this point we've exhausted all of the keys in one segment
// so getting the next key will return the already cached key from the
// other segment
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
// Now there should be no more input
Assert.assertFalse(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.getKey() == null);
Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
项目:hadoop-TCP
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
Writer<K,V> writer =
new Writer<K,V>(jobConf, rfs, outputPath,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
项目:hadoop-TCP
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
Writer<K,V> writer =
new Writer<K,V>(jobConf, rfs, outputPath,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
项目:hardfs
文件:MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
Writer<K,V> writer =
new Writer<K,V>(jobConf, rfs, outputPath,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
项目:hardfs
文件:MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
Writer<K,V> writer =
new Writer<K,V>(jobConf, rfs, outputPath,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}