Java 类org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat 实例源码
项目:hiped2
文件:CompositeJoin.java
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(CompositeJoin.class);
job.setMapperClass(JoinMap.class);
job.setInputFormatClass(CompositeInputFormat.class);
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
CompositeInputFormat.compose("inner",
KeyValueTextInputFormat.class, usersPath, userLogsPath)
);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, userLogsPath);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}