Java 类org.apache.hadoop.mapred.lib.MultipleInputs 实例源码
项目:fst-bench
文件:HiveData.java
private void createUserVisitsTableDirectly() throws IOException, URISyntaxException {
log.info("Creating user visits...");
Path rankings = new Path(options.getResultPath(), RANKINGS);
Path fout = new Path(options.getResultPath(), USERVISITS);
JobConf job = new JobConf(HiveData.class);
String jobname = "Create uservisits";
job.setJobName(jobname);
setVisitsOptions(job);
/***
* Set distributed cache file for table generation,
* cache files include:
* 1. user agents
* 2. country code and language code
* 3. search keys
*/
Path uagentPath = new Path(options.getWorkPath(), uagentf);
DistributedCache.addCacheFile(uagentPath.toUri(), job);
Path countryPath = new Path(options.getWorkPath(), countryf);
DistributedCache.addCacheFile(countryPath.toUri(), job);
Path searchkeyPath = new Path(options.getWorkPath(), searchkeyf);
DistributedCache.addCacheFile(searchkeyPath.toUri(), job);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(JoinBytesInt.class);
MultipleInputs.addInputPath(job, dummy.getPath(),
NLineInputFormat.class, DummyToAccessNoMapper.class);
if (options.isSequenceOut()) {
MultipleInputs.addInputPath(job, rankings,
SequenceFileInputFormat.class, SequenceRankingsToUrlsMapper.class);
} else {
MultipleInputs.addInputPath(job, rankings,
TextInputFormat.class, TextRankingsToUrlsMapper.class);
}
job.setCombinerClass(JoinBytesIntCombiner.class);
job.setReducerClass(CreateUserVisitsReducer.class);
if (options.getNumReds() > 0) {
job.setNumReduceTasks(options.getNumReds());
} else {
job.setNumReduceTasks(Utils.getMaxNumReds());
}
// job.setNumReduceTasks(options.slots/2);
if (options.isSequenceOut()) {
job.setOutputFormat(SequenceFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (null != options.getCodecClass()) {
job.set("mapred.output.compression.type","BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass());
}
FileOutputFormat.setOutputPath(job, fout);
log.info("Running Job: " +jobname);
log.info("Dummy file " + dummy.getPath() + " as input");
log.info("Rankings file " + rankings + " as input");
log.info("Ouput file " + fout);
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
}
项目:hadoop-EAR
文件:HadoopArchives.java
private void appendFromArchive(Path harSrc, List<Path> relativePaths, Path harDst) throws IOException {
Path outputPath = harDst;
FileOutputFormat.setOutputPath(conf, outputPath);
FileSystem outFs = outputPath.getFileSystem(conf);
if (!outFs.exists(outputPath)) {
throw new IOException("Invalid Output. HAR File " + outputPath + "doesn't exist");
}
if (outFs.isFile(outputPath)) {
throw new IOException("Invalid Output. HAR File " + outputPath
+ "must be represented as directory");
}
long totalSize = writeFromArchiveFilesToProcess(harSrc, relativePaths);
//make it a har path
FileSystem fs1 = harSrc.getFileSystem(conf);
URI uri = fs1.getUri();
Path parentPath = new Path("har://" + "hdfs-" + uri.getHost() +":" +
uri.getPort() + fs1.makeQualified(harSrc).toUri().getPath());
FileSystem fs = parentPath.getFileSystem(conf);
conf.set(SRC_LIST_LABEL, srcFiles.toString());
conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
conf.setLong(TOTAL_SIZE_LABEL, totalSize);
long partSize = conf.getLong(HAR_PARTSIZE_LABEL, HAR_PARTSIZE_DEFAULT);
int numMaps = (int) (totalSize / partSize);
//run atleast one map.
conf.setNumMapTasks(numMaps == 0 ? 1 : numMaps);
conf.setNumReduceTasks(1);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.set("hadoop.job.history.user.location", "none");
//make sure no speculative execution is done
conf.setSpeculativeExecution(false);
// set starting offset for mapper
int partId = findFirstAvailablePartId(outputPath);
conf.setInt(PART_ID_OFFSET, partId);
Path index = new Path(outputPath, HarFileSystem.INDEX_NAME);
Path indexDirectory = new Path(outputPath, HarFileSystem.INDEX_NAME + ".copy");
outFs.mkdirs(indexDirectory);
Path indexCopy = new Path(indexDirectory, "data");
outFs.rename(index, indexCopy);
MultipleInputs.addInputPath(conf, jobDirectory, HArchiveInputFormat.class,
HArchivesMapper.class);
MultipleInputs.addInputPath(conf, indexDirectory, TextInputFormat.class,
HArchivesConvertingMapper.class);
conf.setReducerClass(HArchivesMergingReducer.class);
JobClient.runJob(conf);
cleanJobDirectory();
}
项目:Acacia
文件:CSRConverter.java
public static void main(String[] args) throws Exception {
if (!validArgs(args)) {
printUsage();
return;
}
// These are the temp paths that are created on HDFS
String dir1 = "/user/miyuru/csrconverter-output";
String dir2 = "/user/miyuru/csrconverter-output-sorted";
// We first delete the temporary directories if they exist on the HDFS
FileSystem fs1 = FileSystem.get(new JobConf());
System.out.println("Deleting the dir : " + dir1);
if (fs1.exists(new Path(dir1))) {
fs1.delete(new Path(dir1), true);
}
System.out.println("Done deleting the dir : " + dir1);
System.out.println("Deleting the dir : " + dir2);
if (fs1.exists(new Path(dir2))) {
fs1.delete(new Path(dir2), true);
}
Path notinPath = new Path("/user/miyuru/notinverts/notinverts");
if (!fs1.exists(notinPath)) {
fs1.create(notinPath);
}
System.out.println("Done deleting the dir : " + dir2);
// Note on Aug 23 2014: Sometimes after this the mapReduce job hangs.
// need to see why.
VertexCounterClient.setDefaultGraphID(args[3], args[2]);
// First job creates the inverted index
JobConf conf = new JobConf(CSRConverter.class);
conf.set("org.acacia.partitioner.hbase.zookeeper.quorum", args[1]);
conf.set("org.acacia.partitioner.hbase.table", args[2]);
conf.set("org.acacia.partitioner.hbase.contacthost", args[3]);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
// conf.setMapperClass(InvertedMapper.class);
conf.setReducerClass(InvertedReducer.class);
// conf.setInputFormat(TextInputFormat.class);
conf.setInputFormat(NLinesInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
// FileInputFormat.setInputPaths(conf, new Path(args[0]));
MultipleInputs.addInputPath(conf, new Path(args[0]),
NLinesInputFormat.class, InvertedMapper.class);
MultipleInputs.addInputPath(conf, new Path(
"/user/miyuru/notinverts/notinverts"), TextInputFormat.class,
InvertedMapper.class);
FileOutputFormat.setOutputPath(conf, new Path(dir1));
// Also for the moment we turn-off the speculative execution
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.setNumMapTasks(96);
conf.setNumReduceTasks(96);
conf.setPartitionerClass(VertexPartitioner.class);
conf.set("vertex-count", args[4]);
conf.set("zero-flag", args[5]);
Job job = new Job(conf, "csr_inverter");
job.setSortComparatorClass(SortComparator.class);
job.waitForCompletion(true);
}