Java 类org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl 实例源码
项目:hadoop-plus
文件:TestMerger.java
@Test
public void testInMemoryMerger() throws IOException {
JobID jobId = new JobID("a", 0);
TaskAttemptID reduceId = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0);
TaskAttemptID mapId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1), 0);
TaskAttemptID mapId2 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 2), 0);
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
null, null, new Progress(), new MROutputFiles());
// write map outputs
Map<String, String> map1 = new TreeMap<String, String>();
map1.put("apple", "disgusting");
map1.put("carrot", "delicious");
Map<String, String> map2 = new TreeMap<String, String>();
map1.put("banana", "pretty good");
byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
mapOutputBytes1.length);
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
mapOutputBytes2.length);
// create merger and run merge
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
mergeManager.createInMemoryMerger();
List<InMemoryMapOutput<Text, Text>> mapOutputs =
new ArrayList<InMemoryMapOutput<Text, Text>>();
mapOutputs.add(mapOutput1);
mapOutputs.add(mapOutput2);
inMemoryMerger.merge(mapOutputs);
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
List<String> keys = new ArrayList<String>();
List<String> values = new ArrayList<String>();
readOnDiskMapOutput(conf, fs, outPath, keys, values);
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
}
项目:hadoop-TCP
文件:TestMerger.java
@Test
public void testInMemoryMerger() throws Throwable {
JobID jobId = new JobID("a", 0);
TaskAttemptID reduceId = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0);
TaskAttemptID mapId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1), 0);
TaskAttemptID mapId2 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 2), 0);
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
null, null, new Progress(), new MROutputFiles());
// write map outputs
Map<String, String> map1 = new TreeMap<String, String>();
map1.put("apple", "disgusting");
map1.put("carrot", "delicious");
Map<String, String> map2 = new TreeMap<String, String>();
map1.put("banana", "pretty good");
byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
mapOutputBytes1.length);
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
mapOutputBytes2.length);
// create merger and run merge
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
mergeManager.createInMemoryMerger();
List<InMemoryMapOutput<Text, Text>> mapOutputs =
new ArrayList<InMemoryMapOutput<Text, Text>>();
mapOutputs.add(mapOutput1);
mapOutputs.add(mapOutput2);
inMemoryMerger.merge(mapOutputs);
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
List<String> keys = new ArrayList<String>();
List<String> values = new ArrayList<String>();
readOnDiskMapOutput(conf, fs, outPath, keys, values);
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
mergeManager.close();
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
}
项目:hardfs
文件:TestMerger.java
@Test
public void testInMemoryMerger() throws Throwable {
JobID jobId = new JobID("a", 0);
TaskAttemptID reduceId = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0);
TaskAttemptID mapId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1), 0);
TaskAttemptID mapId2 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 2), 0);
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
null, null, new Progress(), new MROutputFiles());
// write map outputs
Map<String, String> map1 = new TreeMap<String, String>();
map1.put("apple", "disgusting");
map1.put("carrot", "delicious");
Map<String, String> map2 = new TreeMap<String, String>();
map1.put("banana", "pretty good");
byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
mapOutputBytes1.length);
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
mapOutputBytes2.length);
// create merger and run merge
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
mergeManager.createInMemoryMerger();
List<InMemoryMapOutput<Text, Text>> mapOutputs =
new ArrayList<InMemoryMapOutput<Text, Text>>();
mapOutputs.add(mapOutput1);
mapOutputs.add(mapOutput2);
inMemoryMerger.merge(mapOutputs);
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
List<String> keys = new ArrayList<String>();
List<String> values = new ArrayList<String>();
readOnDiskMapOutput(conf, fs, outPath, keys, values);
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
mergeManager.close();
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
}
项目:hadoop-on-lustre2
文件:TestMerger.java
@Test
public void testInMemoryMerger() throws Throwable {
JobID jobId = new JobID("a", 0);
TaskAttemptID reduceId = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0);
TaskAttemptID mapId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1), 0);
TaskAttemptID mapId2 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 2), 0);
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
null, null, new Progress(), new MROutputFiles());
// write map outputs
Map<String, String> map1 = new TreeMap<String, String>();
map1.put("apple", "disgusting");
map1.put("carrot", "delicious");
Map<String, String> map2 = new TreeMap<String, String>();
map1.put("banana", "pretty good");
byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
mapOutputBytes1.length);
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
mapOutputBytes2.length);
// create merger and run merge
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
mergeManager.createInMemoryMerger();
List<InMemoryMapOutput<Text, Text>> mapOutputs =
new ArrayList<InMemoryMapOutput<Text, Text>>();
mapOutputs.add(mapOutput1);
mapOutputs.add(mapOutput2);
inMemoryMerger.merge(mapOutputs);
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
List<String> keys = new ArrayList<String>();
List<String> values = new ArrayList<String>();
readOnDiskMapOutput(conf, fs, outPath, keys, values);
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
mergeManager.close();
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
}