@Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Configuration reduceConf = new Configuration(false); Configuration mapConf = new Configuration(false); Job job = Job.getInstance(conf, "correlate logs"); job.setJarByClass(CorrelateLogs.class); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); scan.addFamily(Bytes.toBytes("struct")); TableMapReduceUtil.initTableMapperJob(args[0], scan, HBaseMapper.class, Text.class, LongWritable.class, job); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setNumReduceTasks(1); ChainReducer.setReducer(job, HBaseReducer.class, Text.class, LongWritable.class, Text.class, LongPairWritable.class, reduceConf); ChainReducer.addMapper(job, AggregateMapper.class, Text.class, LongPairWritable.class, Text.class, DoubleWritable.class, mapConf); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, new Path(args[0])); TextOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; }
@Override public void addMapReduce(final Class<? extends Mapper> mapper, final Class<? extends Reducer> combiner, final Class<? extends Reducer> reducer, final Class<? extends WritableComparator> comparator, final Class<? extends WritableComparable> mapOutputKey, final Class<? extends WritableComparable> mapOutputValue, final Class<? extends WritableComparable> reduceOutputKey, final Class<? extends WritableComparable> reduceOutputValue, final Configuration configuration) { Configuration mergedConf = overlayConfiguration(getConf(), configuration); try { final Job job; if (State.NONE == this.state || State.REDUCER == this.state) { // Create a new job with a reference to mergedConf job = Job.getInstance(mergedConf); job.setJobName(makeClassName(mapper) + ARROW + makeClassName(reducer)); HBaseAuthHelper.setHBaseAuthToken(mergedConf, job); this.jobs.add(job); } else { job = this.jobs.get(this.jobs.size() - 1); job.setJobName(job.getJobName() + ARROW + makeClassName(mapper) + ARROW + makeClassName(reducer)); } job.setNumReduceTasks(this.getConf().getInt("mapreduce.job.reduces", this.getConf().getInt("mapreduce.tasktracker.reduce.tasks.maximum", 1))); ChainMapper.addMapper(job, mapper, NullWritable.class, FaunusVertex.class, mapOutputKey, mapOutputValue, mergedConf); ChainReducer.setReducer(job, reducer, mapOutputKey, mapOutputValue, reduceOutputKey, reduceOutputValue, mergedConf); if (null != comparator) job.setSortComparatorClass(comparator); if (null != combiner) job.setCombinerClass(combiner); if (null == job.getConfiguration().get(MAPREDUCE_MAP_OUTPUT_COMPRESS, null)) job.getConfiguration().setBoolean(MAPREDUCE_MAP_OUTPUT_COMPRESS, true); if (null == job.getConfiguration().get(MAPREDUCE_MAP_OUTPUT_COMPRESS_CODEC, null)) job.getConfiguration().setClass(MAPREDUCE_MAP_OUTPUT_COMPRESS_CODEC, DefaultCodec.class, CompressionCodec.class); this.state = State.REDUCER; } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } }
@Override public void addMap(final Class<? extends Mapper> mapper, final Class<? extends WritableComparable> mapOutputKey, final Class<? extends WritableComparable> mapOutputValue, Configuration configuration) { Configuration mergedConf = overlayConfiguration(getConf(), configuration); try { final Job job; if (State.NONE == this.state) { // Create a new job with a reference to mergedConf job = Job.getInstance(mergedConf); job.setNumReduceTasks(0); job.setJobName(makeClassName(mapper)); HBaseAuthHelper.setHBaseAuthToken(mergedConf, job); this.jobs.add(job); } else { job = this.jobs.get(this.jobs.size() - 1); job.setJobName(job.getJobName() + ARROW + makeClassName(mapper)); } if (State.MAPPER == this.state || State.NONE == this.state) { ChainMapper.addMapper(job, mapper, NullWritable.class, FaunusVertex.class, mapOutputKey, mapOutputValue, mergedConf); /* In case no reducer is defined later for this job, set the job * output k/v to match the mapper output k-v. Output formats that * care about their configured k-v classes (such as * SequenceFileOutputFormat) require these to be set correctly lest * they throw an exception at runtime. * * ChainReducer.setReducer overwrites these k-v settings, so if a * reducer is added onto this job later, these settings will be * overridden by the actual reducer's output k-v. */ job.setOutputKeyClass(mapOutputKey); job.setOutputValueClass(mapOutputValue); this.state = State.MAPPER; logger.info("Added mapper " + job.getJobName() + " via ChainMapper with output (" + mapOutputKey + "," + mapOutputValue + "); current state is " + state); } else { ChainReducer.addMapper(job, mapper, NullWritable.class, FaunusVertex.class, mapOutputKey, mapOutputValue, mergedConf); this.state = State.REDUCER; logger.info("Added mapper " + job.getJobName() + " via ChainReducer with output (" + mapOutputKey + "," + mapOutputValue + "); current state is " + state); } } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } }