Java 类org.apache.hadoop.mapred.lib.NLineInputFormat 实例源码
项目:fst-bench
文件:BayesData.java
private void createBayesData() throws IOException, URISyntaxException {
log.info("creating bayes text data ... ");
JobConf job = new JobConf();
Path fout = options.getResultPath();
Utils.checkHdfsPath(fout);
String jobname = "Create bayes data";
job.setJobName(jobname);
Utils.shareDict(options, job);
setBayesOptions(job);
FileInputFormat.setInputPaths(job, dummy.getPath());
job.setInputFormat(NLineInputFormat.class);
job.setJarByClass(CreateBayesPages.class);
job.setMapperClass(CreateBayesPages.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, fout);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
log.info("Running Job: " +jobname);
log.info("Pages file " + dummy.getPath() + " as input");
log.info("Rankings file " + fout + " as output");
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
}
项目:geolint
文件:ManifestCheckHadoop.java
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
JobConf conf = new JobConf(ManifestCheckHadoop.class);
// String to use for name and output folder in HDFS
String name = "ManifestGenHadoop_"+System.currentTimeMillis();
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(name));
conf.setJobName(name);
//set the mapper to this class' mapper
conf.setMapperClass(ManifestCheckMap.class);
//conf.setReducerClass(Reduce.class);
//this input format should split the input by one line per map by default.
conf.setInputFormat(NLineInputFormat.class);
conf.setInt("mapred.line.input.format.linespermap", 1000);
//sets how the output is written cf. OutputFormat
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
//we only want one reduce task
conf.setNumReduceTasks(1);
JobClient.runJob(conf);
return 0;
}
项目:geolint
文件:ManifestGenHadoop.java
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
JobConf conf = new JobConf(ManifestGenHadoop.class);
// String to use for name and output folder in HDFS
String name = "ManifestGenHadoop_"+System.currentTimeMillis();
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(name));
conf.setJobName(name);
//set the mapper to this class' mapper
conf.setMapperClass(ManifestGenMap.class);
//this input format should split the input by one line per map by default.
conf.setInputFormat(NLineInputFormat.class);
// When this was 200 a job took 22 mins (230k pdfs)
// When this was 1000 the same job took 16 mins
conf.setInt("mapred.line.input.format.linespermap", 1000);
//sets how the output is written cf. OutputFormat
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
//we only want one reduce task
conf.setNumReduceTasks(1);
JobClient.runJob(conf);
return 0;
}
项目:geolint
文件:GeoLintHadoop.java
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
JobConf conf = new JobConf(GeoLintHadoop.class);
// String to use for name and output folder in HDFS
String name = "GeoLintHadoop_"+System.currentTimeMillis();
// set a timeout to 30 mins as we may transfer and checksum ~4gb files
conf.set("mapred.task.timeout", Integer.toString(30*60*1000));
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(name));
conf.setJobName(name);
//set the mapper to this class' mapper
conf.setMapperClass(GeoLintMap.class);
//conf.setReducerClass(GeoLintReduce.class);
//this input format should split the input by one line per map by default.
conf.setInputFormat(NLineInputFormat.class);
conf.setInt("mapred.line.input.format.linespermap", 2000);
//sets how the output is written cf. OutputFormat
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
//we only want 28 reduce tasks as we have 28 reduce slots
conf.setNumReduceTasks(28);
JobClient.runJob(conf);
return 0;
}
项目:fst-bench
文件:PagerankData.java
private void createPageRankNodesDirectly() throws IOException {
log.info("Creating PageRank nodes...", null);
Path fout = new Path(options.getResultPath(), VERTICALS_DIR_NAME);
JobConf job = new JobConf(PagerankData.class);
String jobname = "Create pagerank nodes";
job.setJobName(jobname);
setPageRankNodesOptions(job);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, dummy.getPath());
job.setInputFormat(NLineInputFormat.class);
if (balance) {
/***
* Balance the output order of nodes, to prevent the running
* of pagerank bench from potential data skew
*/
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapperClass(BalancedLinkNodesMapper.class);
job.setReducerClass(BalancedLinkNodesReducer.class);
// job.setPartitionerClass(ModulusPartitioner.class);
if (options.getNumReds() > 0) {
job.setNumReduceTasks(options.getNumReds());
} else {
job.setNumReduceTasks(Utils.getMaxNumReds());
}
} else {
job.setMapOutputKeyClass(Text.class);
job.setMapperClass(DummyToNodesMapper.class);
job.setNumReduceTasks(0);
}
if (options.isSequenceOut()) {
job.setOutputFormat(SequenceFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (null != options.getCodecClass()) {
job.set("mapred.output.compression.type","BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass());
}
FileOutputFormat.setOutputPath(job, fout);
log.info("Running Job: " +jobname);
log.info("Dummy file " + dummy.getPath() + " as input");
log.info("Vertices file " + fout + " as output");
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
}
项目:fst-bench
文件:PagerankData.java
private void createPageRankLinksDirectly() throws IOException, URISyntaxException {
log.info("Creating PageRank links", null);
JobConf job = new JobConf(PagerankData.class);
String jobname = "Create pagerank links";
Path fout = new Path(options.getResultPath(), EDGES_DIR_NAME);
job.setJobName(jobname);
setPageRankLinksOptions(job);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
// job.setMapOutputKeyClass(LongWritable.class);
// job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, dummy.getPath());
job.setInputFormat(NLineInputFormat.class);
job.setMapperClass(DummyToPageRankLinksMapper.class);
if (options.isSequenceOut()) {
job.setOutputFormat(SequenceFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (null != options.getCodecClass()) {
job.set("mapred.output.compression.type","BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass());
}
FileOutputFormat.setOutputPath(job, fout);
log.info("Running Job: " +jobname);
log.info("Dummy file " + dummy.getPath() + " as input");
log.info("Edges file " + fout + " as output");
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
}
项目:fst-bench
文件:HiveData.java
private void createRankingsTableDirectly() throws IOException, URISyntaxException {
log.info("Creating table rankings...");
Path fout = new Path(options.getResultPath(), RANKINGS);
JobConf job = new JobConf(HiveData.class);
String jobname = "Create rankings";
/** TODO: change another more effective way as this operation may cause
* about 2 min delay (originally ~15min in total)
*/
setRankingsOptions(job);
job.setJobName(jobname);
job.set("mapred.reduce.slowstart.completed.maps", "0.3");
job.set("mapreduce.job.reduce.slowstart.completedmaps", "0.3");
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(JoinBytesInt.class);
job.setJarByClass(DummyToRankingsMapper.class);
job.setJarByClass(JoinBytesIntCombiner.class);
job.setJarByClass(GenerateRankingsReducer.class);
job.setMapperClass(DummyToRankingsMapper.class);
job.setCombinerClass(JoinBytesIntCombiner.class);
job.setReducerClass(GenerateRankingsReducer.class);
if (options.getNumReds() > 0) {
job.setNumReduceTasks(options.getNumReds());
} else {
job.setNumReduceTasks(Utils.getMaxNumReds());
}
job.setInputFormat(NLineInputFormat.class);
FileInputFormat.setInputPaths(job, dummy.getPath());
job.set("mapred.map.output.compression.type", "BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
MapFileOutputFormat.setCompressOutput(job, true);
// MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.LzoCodec.class);
MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.DefaultCodec.class);
if (options.isSequenceOut()) {
job.setOutputFormat(SequenceFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (null != options.getCodecClass()) {
job.set("mapred.output.compression.type","BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass());
}
FileOutputFormat.setOutputPath(job, fout);
log.info("Running Job: " +jobname);
log.info("Pages file " + dummy.getPath() + " as input");
log.info("Rankings file " + fout + " as output");
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
}
项目:fst-bench
文件:HiveData.java
private void createUserVisitsTableDirectly() throws IOException, URISyntaxException {
log.info("Creating user visits...");
Path rankings = new Path(options.getResultPath(), RANKINGS);
Path fout = new Path(options.getResultPath(), USERVISITS);
JobConf job = new JobConf(HiveData.class);
String jobname = "Create uservisits";
job.setJobName(jobname);
setVisitsOptions(job);
/***
* Set distributed cache file for table generation,
* cache files include:
* 1. user agents
* 2. country code and language code
* 3. search keys
*/
Path uagentPath = new Path(options.getWorkPath(), uagentf);
DistributedCache.addCacheFile(uagentPath.toUri(), job);
Path countryPath = new Path(options.getWorkPath(), countryf);
DistributedCache.addCacheFile(countryPath.toUri(), job);
Path searchkeyPath = new Path(options.getWorkPath(), searchkeyf);
DistributedCache.addCacheFile(searchkeyPath.toUri(), job);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(JoinBytesInt.class);
MultipleInputs.addInputPath(job, dummy.getPath(),
NLineInputFormat.class, DummyToAccessNoMapper.class);
if (options.isSequenceOut()) {
MultipleInputs.addInputPath(job, rankings,
SequenceFileInputFormat.class, SequenceRankingsToUrlsMapper.class);
} else {
MultipleInputs.addInputPath(job, rankings,
TextInputFormat.class, TextRankingsToUrlsMapper.class);
}
job.setCombinerClass(JoinBytesIntCombiner.class);
job.setReducerClass(CreateUserVisitsReducer.class);
if (options.getNumReds() > 0) {
job.setNumReduceTasks(options.getNumReds());
} else {
job.setNumReduceTasks(Utils.getMaxNumReds());
}
// job.setNumReduceTasks(options.slots/2);
if (options.isSequenceOut()) {
job.setOutputFormat(SequenceFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (null != options.getCodecClass()) {
job.set("mapred.output.compression.type","BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass());
}
FileOutputFormat.setOutputPath(job, fout);
log.info("Running Job: " +jobname);
log.info("Dummy file " + dummy.getPath() + " as input");
log.info("Rankings file " + rankings + " as input");
log.info("Ouput file " + fout);
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
}
项目:fst-bench
文件:NutchData.java
private void createNutchUrls() throws IOException, URISyntaxException {
log.info("Creating nutch urls ...");
JobConf job = new JobConf(NutchData.class);
Path urls = new Path(options.getWorkPath(), URLS_DIR_NAME);
Utils.checkHdfsPath(urls);
String jobname = "Create nutch urls";
job.setJobName(jobname);
setNutchOptions(job);
FileInputFormat.setInputPaths(job, dummy.getPath());
job.setInputFormat(NLineInputFormat.class);
job.setMapperClass(CreateUrlHash.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormat(MapFileOutputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
MapFileOutputFormat.setOutputPath(job, urls);
// SequenceFileOutputFormat.setOutputPath(job, fout);
/*
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
*/
log.info("Running Job: " +jobname);
log.info("Pages file " + dummy.getPath() + " as input");
log.info("Rankings file " + urls + " as output");
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
log.info("Cleaning temp files...");
Utils.cleanTempFiles(urls);
}
项目:fst-bench
文件:NutchData.java
private void createNutchIndexData() throws IOException, URISyntaxException {
log.info("creating nutch index files ... ");
JobConf job = new JobConf(NutchData.class);
Utils.shareUrls(URLS_DIR_NAME, options, job);
Utils.shareDict(options, job);
setNutchOptions(job);
Path fsegments = new Path(options.getResultPath(), SEGMENTS_DIR_NAME);
Utils.checkHdfsPath(fsegments, true);
segment = new Path(fsegments, generateSegmentName());
Utils.checkHdfsPath(segment, true);
String jobname = "Create nutch index data";
job.setJobName(jobname);
job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
FileInputFormat.setInputPaths(job, dummy.getPath());
job.setInputFormat(NLineInputFormat.class);
job.setMapperClass(CreateNutchPages.class);
job.setCombinerClass(CombineReferences.class);
job.setReducerClass(CreateLinks.class);
if (options.getNumReds() > 0) {
job.setNumReduceTasks(options.getNumReds());
} else {
job.setNumReduceTasks(Utils.getMaxNumMaps());
}
FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormat(NutchOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(References.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchParse.class);
log.info("Running Job: " + jobname);
log.info("Pages file " + dummy.getPath() + " as input");
log.info("Rankings file " + segment + " as output");
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
log.info("Cleaning temp files...");
Utils.cleanTempFiles(segment);
}
项目:systemml
文件:CleanupMR.java
public static boolean runJob( DMLConfig conf )
throws Exception
{
boolean ret = false;
try
{
JobConf job;
job = new JobConf(CleanupMR.class);
job.setJobName("Cleanup-MR");
//set up SystemML local tmp dir
String dir = conf.getTextValue(DMLConfig.LOCAL_TMP_DIR);
MRJobConfiguration.setSystemMLLocalTmpDir(job, dir);
//set mappers, reducers
int numNodes = InfrastructureAnalyzer.getRemoteParallelNodes();
job.setMapperClass(CleanupMapper.class); //map-only
job.setNumMapTasks(numNodes); //numMappers
job.setNumReduceTasks( 0 );
//set input/output format, input path
String inFileName = conf.getTextValue(DMLConfig.SCRATCH_SPACE)+"/cleanup_tasks";
job.setInputFormat(NLineInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
Path path = new Path( inFileName );
FileInputFormat.setInputPaths(job, path);
writeCleanupTasksToFile(path, numNodes);
//disable automatic tasks timeouts and speculative task exec
job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
job.setMapSpeculativeExecution(false);
/////
// execute the MR job
RunningJob runjob = JobClient.runJob(job);
ret = runjob.isSuccessful();
}
catch(Exception ex)
{
//don't raise an exception, just gracefully an error message.
LOG.error("Failed to run cleanup MR job. ",ex);
}
return ret;
}