Java 类org.apache.hadoop.mapreduce.lib.partition.InputSampler 实例源码
项目:ldbc_snb_datagen
文件:HadoopFileSorter.java
/**
* Sorts a hadoop sequence file
*
* @param inputFileName The name of the file to sort.
* @param outputFileName The name of the sorted file.
* @throws Exception
*/
public void run(String inputFileName, String outputFileName) throws Exception {
int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads", 1);
Job job = Job.getInstance(conf, "Sorting " + inputFileName);
FileInputFormat.setInputPaths(job, new Path(inputFileName));
FileOutputFormat.setOutputPath(job, new Path(outputFileName));
job.setMapOutputKeyClass(K);
job.setMapOutputValueClass(V);
job.setOutputKeyClass(K);
job.setOutputValueClass(V);
job.setNumReduceTasks(numThreads);
job.setJarByClass(V);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(inputFileName + "_partition.lst"));
InputSampler.writePartitionFile(job, sampler);
job.setPartitionerClass(TotalOrderPartitioner.class);
if (!job.waitForCompletion(true)) {
throw new Exception();
}
}
项目:TeraSort-Local-Hadoop-MR-Spark
文件:TeraSort.java
@Override
public int run(String[] newargs) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TeraSort.class);
// set mapper and reducer class
job.setMapperClass(TeraMapper.class);
job.setReducerClass(TeraReducer.class);
// set number of reducers
job.setNumReduceTasks(32);
job.setInputFormatClass(KeyValueTextInputFormat.class);
// set output of map class text as key
job.setMapOutputKeyClass(Text.class);
// set output of reducer as text class as key and value both are Text
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// set input path for the job
FileInputFormat.addInputPath(job, new Path(newargs[0]));
Path partitionFile = new Path(new Path(newargs[2]), "partitioning");
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
// use random sampler to write partitioner file
InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000,32);
InputSampler.writePartitionFile(job, sampler);
// set partitioner to TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
// set output directory for the job
FileOutputFormat.setOutputPath(job, new Path(newargs[1]));
int ret = job.waitForCompletion(true) ? 0 : 1;
logger.info("Done");
return ret;
}
项目:hdt-mr
文件:HDTBuilderDriver.java
protected boolean runDictionaryJob() throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
boolean jobOK;
Job job = null;
BufferedWriter bufferedWriter;
// if output path exists...
if (this.dictionaryFS.exists(this.conf.getDictionaryOutputPath())) {
if (this.conf.getDeleteDictionaryOutputPath()) { // ... and option provided, delete recursively
this.dictionaryFS.delete(this.conf.getDictionaryOutputPath(), true);
} else { // ... and option not provided, fail
System.out.println("Dictionary output path does exist: " + this.conf.getDictionaryOutputPath());
System.out.println("Select other path or use option -dd to overwrite");
System.exit(-1);
}
}
// Sample the SequenceInputFormat to do TotalSort and create final output
job = new Job(this.conf.getConfigurationObject(), this.conf.getDictionaryJobName() + " phase 2");
job.setJarByClass(HDTBuilderDriver.class);
System.out.println("samples = " + this.conf.getDictionarySamplesPath());
System.out.println("output = " + this.conf.getDictionaryOutputPath());
FileInputFormat.addInputPath(job, this.conf.getDictionarySamplesPath());
FileOutputFormat.setOutputPath(job, this.conf.getDictionaryOutputPath());
job.setInputFormatClass(SequenceFileInputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
// Identity Mapper
// job.setMapperClass(Mapper.class);
job.setCombinerClass(DictionaryCombiner.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setReducerClass(DictionaryReducer.class);
job.setNumReduceTasks(this.conf.getDictionaryReducers());
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
System.out.println("Sampling started");
InputSampler.writePartitionFile(job, new InputSampler.IntervalSampler<Text, Text>(this.conf.getSampleProbability()));
String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration());
URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
DistributedCache.addCacheFile(partitionUri, job.getConfiguration());
DistributedCache.createSymlink(job.getConfiguration());
System.out.println("Sampling finished");
MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.SHARED, SequenceFileOutputFormat.class, Text.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.SUBJECTS, SequenceFileOutputFormat.class, Text.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.PREDICATES, SequenceFileOutputFormat.class, Text.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.OBJECTS, SequenceFileOutputFormat.class, Text.class, NullWritable.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, com.hadoop.compression.lzo.LzoCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
jobOK = job.waitForCompletion(true);
this.numShared = job.getCounters().findCounter(Counters.Shared).getValue();
this.numSubjects = job.getCounters().findCounter(Counters.Subjects).getValue();
this.numPredicates = job.getCounters().findCounter(Counters.Predicates).getValue();
this.numObjects = job.getCounters().findCounter(Counters.Objects).getValue();
bufferedWriter = new BufferedWriter(new OutputStreamWriter(this.dictionaryFS.create(this.conf.getDictionaryCountersFile())));
bufferedWriter.write(HDTBuilderConfiguration.SHARED + "=" + this.numShared + "\n");
bufferedWriter.write(HDTBuilderConfiguration.SUBJECTS + "=" + this.numSubjects + "\n");
bufferedWriter.write(HDTBuilderConfiguration.PREDICATES + "=" + this.numPredicates + "\n");
bufferedWriter.write(HDTBuilderConfiguration.OBJECTS + "=" + this.numObjects + "\n");
bufferedWriter.close();
return jobOK;
}
项目:hdt-mr
文件:HDTBuilderDriver.java
protected boolean runTriplesJob() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Job job = null;
boolean jobOK;
// if triples output path exists...
if (this.triplesFS.exists(this.conf.getTriplesOutputPath())) {
if (this.conf.getDeleteTriplesOutputPath()) { // ... and option provided, delete recursively
this.triplesFS.delete(this.conf.getTriplesOutputPath(), true);
} else { // ... and option not provided, fail
System.out.println("Triples output path does exist: " + this.conf.getTriplesOutputPath());
System.out.println("Select other path or use option -dt to overwrite");
System.exit(-1);
}
}
job = new Job(this.conf.getConfigurationObject(), this.conf.getTriplesJobName() + " phase 2");
job.setJarByClass(HDTBuilderDriver.class);
FileInputFormat.addInputPath(job, this.conf.getTriplesSamplesPath());
FileOutputFormat.setOutputPath(job, this.conf.getTriplesOutputPath());
job.setInputFormatClass(SequenceFileInputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
job.setSortComparatorClass(TripleSPOComparator.class);
job.setGroupingComparatorClass(TripleSPOComparator.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setOutputKeyClass(TripleSPOWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(this.conf.getTriplesReducers());
System.out.println("Sampling started");
InputSampler.writePartitionFile(job, new InputSampler.IntervalSampler<Text, Text>(this.conf.getSampleProbability()));
String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration());
URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
DistributedCache.addCacheFile(partitionUri, job.getConfiguration());
DistributedCache.createSymlink(job.getConfiguration());
System.out.println("Sampling finished");
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, com.hadoop.compression.lzo.LzoCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
jobOK = job.waitForCompletion(true);
return jobOK;
}
项目:tree-index
文件:SortDriver.java
public int run(String[] args) throws Exception {
SortConfig config = new SortConfig();
config.fromArray(args);
Job job = Job.getInstance(getConf());
job.setJobName("sort");
job.setJarByClass(SortDriver.class);
// define the path
Path inputPath = new Path(config.getInput());
Path partitionFilePath = new Path(config.getPartition());
Path outputPath = new Path(config.getOutput());
Path metaPath = new Path(config.getMeta());
LOGGER.info("use " + inputPath.toString() + " as sort input");
LOGGER.info("use " + partitionFilePath.toString() + " as partition");
LOGGER.info("use " + outputPath.toString() + " as sort output");
LOGGER.info("use " + metaPath.toString() + " as meta output");
// define the mapper
// use the identity mapper, which is the default implementation
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(job, inputPath);
// define the reducer
job.getConfiguration().set(SortReducer.META_BASE_CONFIG_NAME, metaPath.toString());
job.setReducerClass(SortReducer.class);
job.setNumReduceTasks(NUM_REDUCER);
// use text for debug, use sequence is faster I guess
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(job, outputPath);
// set partition
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFilePath);
// set the sampler
InputSampler.writePartitionFile(job, new InputSampler.RandomSampler(
1, 10000));
// set multiple output
MultipleOutputs.addNamedOutput(job, "meta", TextOutputFormat.class,
IntWritable.class, Text.class);
// clean up the old output path
outputPath.getFileSystem(job.getConfiguration()).delete(outputPath, true);
metaPath.getFileSystem(job.getConfiguration()).delete(metaPath, true);
// run the job and wait until it complete
return job.waitForCompletion(true) ? 0 : 1;
}
项目:hiped2
文件:TotalSortMapReduce.java
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
int numReducers = 2;
Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));
InputSampler.Sampler<Text, Text> sampler =
new InputSampler.RandomSampler<Text, Text>
(0.1,
10000,
10);
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(TotalSortMapReduce.class);
job.setNumReduceTasks(numReducers);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
InputSampler.writePartitionFile(job, sampler);
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, conf);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
项目:ldbc_snb_datagen
文件:HadoopFileRanker.java
/**
* Sorts a hadoop sequence file
*
* @param inputFileName The name of the file to sort.
* @param outputFileName The name of the sorted file.
* @throws Exception
*/
public void run(String inputFileName, String outputFileName) throws Exception {
int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads", 1);
if (keySetterName != null) {
conf.set("keySetterClassName", keySetterName);
}
/** First Job to sort the key-value pairs and to count the number of elements processed by each reducer.**/
Job jobSort = Job.getInstance(conf, "Sorting " + inputFileName);
FileInputFormat.setInputPaths(jobSort, new Path(inputFileName));
FileOutputFormat
.setOutputPath(jobSort, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate"));
if (keySetterName != null) {
jobSort.setMapperClass(HadoopFileRankerSortMapper.class);
}
jobSort.setMapOutputKeyClass(K);
jobSort.setMapOutputValueClass(V);
jobSort.setOutputKeyClass(BlockKey.class);
jobSort.setOutputValueClass(V);
jobSort.setNumReduceTasks(numThreads);
jobSort.setReducerClass(HadoopFileRankerSortReducer.class);
jobSort.setJarByClass(V);
jobSort.setInputFormatClass(SequenceFileInputFormat.class);
jobSort.setOutputFormatClass(SequenceFileOutputFormat.class);
InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000);
TotalOrderPartitioner.setPartitionFile(jobSort.getConfiguration(), new Path(inputFileName + "_partition.lst"));
InputSampler.writePartitionFile(jobSort, sampler);
jobSort.setPartitionerClass(TotalOrderPartitioner.class);
if (!jobSort.waitForCompletion(true)) {
throw new Exception();
}
/** Second Job to assign the rank to each element.**/
Job jobRank = Job.getInstance(conf, "Sorting " + inputFileName);
FileInputFormat
.setInputPaths(jobRank, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate"));
FileOutputFormat.setOutputPath(jobRank, new Path(outputFileName));
jobRank.setMapOutputKeyClass(BlockKey.class);
jobRank.setMapOutputValueClass(V);
jobRank.setOutputKeyClass(LongWritable.class);
jobRank.setOutputValueClass(V);
jobRank.setSortComparatorClass(BlockKeyComparator.class);
jobRank.setNumReduceTasks(numThreads);
jobRank.setReducerClass(HadoopFileRankerFinalReducer.class);
jobRank.setJarByClass(V);
jobRank.setInputFormatClass(SequenceFileInputFormat.class);
jobRank.setOutputFormatClass(SequenceFileOutputFormat.class);
jobRank.setPartitionerClass(HadoopFileRankerPartitioner.class);
if (!jobRank.waitForCompletion(true)) {
throw new Exception();
}
try {
FileSystem fs = FileSystem.get(conf);
for (int i = 0; i < numThreads; ++i) {
fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rank_" + i), true);
}
fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate"), true);
} catch (IOException e) {
System.err.println(e.getMessage());
}
}
项目:hbase-in-action
文件:BulkImportJobExample.java
/**
* Fixed a potential overlap of generated regions / splits for a dataset with lots of identical keys. For instance,
* let your samples be: {1,1,1 ,1,3,3, 3,5,6} and your number of partitions be 3. Original implementation will get you
* following splits, 1-1, 3-3, 3-6, notice the overlap between 2nd and 3rd partition.
*
* @param job
* @param sampler
* @param <K>
* @param <V>
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
public static <K, V> void writePartitionFile(Job job, InputSampler.Sampler<K, V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
LinkedList<K> splits = new LinkedList<K>();
Configuration conf = job.getConfiguration();
final InputFormat inf =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = null; //sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator = (RawComparator<K>) job.getGroupingComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) fs.delete(dst, false);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
K lastKey = null;
K currentKey = null;
int lastKeyIndex = -1;
for (int i = 1; i < numPartitions; ++i) {
int currentKeyOffset = Math.round(stepSize * i);
if (lastKeyIndex > currentKeyOffset) {
long keyOffset = lastKeyIndex - currentKeyOffset;
float errorRate = keyOffset / samples.length;
LOG.warn(
String.format("Partitions overlap. Consider using a different Sampler " +
"and/or increase the number of samples and/or use more splits to take samples from. " +
"Next sample would have been %s key overlaps by a distance of %d (factor %f) ", samples[currentKeyOffset], keyOffset, errorRate));
currentKeyOffset = lastKeyIndex + 1;
}
currentKey = samples[currentKeyOffset];
while (lastKey != null && comparator.compare(currentKey, lastKey) == 0) {
currentKeyOffset++;
if (currentKeyOffset >= samples.length) {
LOG.info("Last 10 elements:");
for (int d = samples.length - 1; d > samples.length - 11; d--) {
LOG.debug(samples[d]);
}
throw new IOException("Not enough samples, stopped at partition " + i);
}
currentKey = samples[currentKeyOffset];
}
writer.append(currentKey, nullValue);
lastKey = currentKey;
lastKeyIndex = currentKeyOffset;
splits.add(currentKey);
}
writer.close();
LOG.info("********************************************* ");
LOG.info(" START KEYs for new Regions: ");
for (K split : splits) {
LOG.info("* " + split.toString());
}
}
项目:ldbc_snb_datagen_deprecated2015
文件:HadoopFileRanker.java
/** Sorts a hadoop sequence file
*
* @param inputFileName The name of the file to sort.
* @param outputFileName The name of the sorted file.
* @throws Exception
*/
public void run( String inputFileName, String outputFileName ) throws Exception {
int numThreads = conf.getInt("numThreads",1);
/** First Job to sort the key-value pairs and to count the number of elements processed by each reducer.**/
Job jobSort = new Job(conf, "Sorting "+inputFileName);
FileInputFormat.setInputPaths(jobSort, new Path(inputFileName));
FileOutputFormat.setOutputPath(jobSort, new Path(conf.get("outputDir")+"/hadoop/rankIntermediate"));
jobSort.setMapOutputKeyClass(K);
jobSort.setMapOutputValueClass(V);
jobSort.setOutputKeyClass(ComposedKey.class);
jobSort.setOutputValueClass(V);
jobSort.setNumReduceTasks(numThreads);
jobSort.setReducerClass(HadoopFileRankerSortReducer.class);
jobSort.setJarByClass(V);
jobSort.setInputFormatClass(SequenceFileInputFormat.class);
jobSort.setOutputFormatClass(SequenceFileOutputFormat.class);
InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000);
TotalOrderPartitioner.setPartitionFile(jobSort.getConfiguration(), new Path(inputFileName + "_partition.lst"));
InputSampler.writePartitionFile(jobSort, sampler);
jobSort.setPartitionerClass(TotalOrderPartitioner.class);
jobSort.waitForCompletion(true);
/** Second Job to assign the rank to each element.**/
Job jobRank = new Job(conf, "Sorting "+inputFileName);
FileInputFormat.setInputPaths(jobRank, new Path(conf.get("outputDir")+"/hadoop/rankIntermediate"));
FileOutputFormat.setOutputPath(jobRank, new Path(outputFileName));
jobRank.setMapOutputKeyClass(ComposedKey.class);
jobRank.setMapOutputValueClass(V);
jobRank.setOutputKeyClass(LongWritable.class);
jobRank.setOutputValueClass(V);
jobRank.setSortComparatorClass(ComposedKeyComparator.class);
jobRank.setNumReduceTasks(numThreads);
jobRank.setReducerClass(HadoopFileRankerFinalReducer.class);
jobRank.setJarByClass(V);
jobRank.setInputFormatClass(SequenceFileInputFormat.class);
jobRank.setOutputFormatClass(SequenceFileOutputFormat.class);
jobRank.setPartitionerClass(HadoopFileRankerPartitioner.class);
jobRank.waitForCompletion(true);
try{
FileSystem fs = FileSystem.get(conf);
for(int i = 0; i < numThreads;++i ) {
fs.delete(new Path(conf.get("outputDir")+"/hadoop/rank_"+i),true);
}
fs.delete(new Path(conf.get("outputDir")+"/hadoop/rankIntermediate"),true);
} catch(IOException e) {
System.err.println(e.getMessage());
}
}
项目:hadoop-map-reduce-patterns
文件:TotalOrderSortingStage.java
@SuppressWarnings("unchecked")
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Path inputPath = new Path(args[0]);
Path partitionFile = new Path(args[1] + "_partitions.lst");
Path outputStage = new Path(args[1] + "_staging");
Path outputOrder = new Path(args[1]);
// Configure job to prepare for sampling
Job sampleJob = new Job(conf, "TotalOrderSortingStage");
sampleJob.setJarByClass(TotalOrderSortingStage.class);
// Use the mapper implementation with zero reduce tasks
sampleJob.setMapperClass(LastAccessMapper.class);
sampleJob.setNumReduceTasks(0);
sampleJob.setOutputKeyClass(Text.class);
sampleJob.setOutputValueClass(Text.class);
TextInputFormat.setInputPaths(sampleJob, inputPath);
// Set the output format to a sequence file
sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage);
// Submit the job and get completion code.
int code = sampleJob.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
Job orderJob = new Job(conf, "TotalOrderSortingStage");
orderJob.setJarByClass(TotalOrderSortingStage.class);
// Here, use the identity mapper to output the key/value pairs in
// the SequenceFile
orderJob.setMapperClass(Mapper.class);
orderJob.setReducerClass(ValuesReducer.class);
// Set the number of reduce tasks to an appropriate number for the
// amount of data being sorted
orderJob.setNumReduceTasks(10);
// Use Hadoop's TotalOrderPartitioner class
orderJob.setPartitionerClass(TotalOrderPartitioner.class);
// Set the partition file
TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
partitionFile);
orderJob.setOutputKeyClass(Text.class);
orderJob.setOutputValueClass(Text.class);
// Set the input to the previous job's output
orderJob.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
// Set the output path to the command line parameter
TextOutputFormat.setOutputPath(orderJob, outputOrder);
// Set the separator to an empty string
orderJob.getConfiguration().set(
"mapred.textoutputformat.separator", "");
// Use the InputSampler to go through the output of the previous
// job, sample it, and create the partition file
InputSampler.writePartitionFile(orderJob,
new InputSampler.RandomSampler(.001, 10000));
// Submit the job
code = orderJob.waitForCompletion(true) ? 0 : 2;
}
// Clean up the partition file and the staging directory
FileSystem.get(new Configuration()).delete(partitionFile, false);
FileSystem.get(new Configuration()).delete(outputStage, true);
return code;
}