Java 类org.apache.hadoop.mapreduce.Reducer 实例源码
项目:hadoop
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple fail job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createFailJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
FileSystem fs = outdir.getFileSystem(conf);
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
Job theJob = Job.getInstance(conf);
theJob.setJobName("Fail-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(FailMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:HotTopicsApp
文件:HotTopicsApp.java
@Override
protected void setup(
Reducer<NullWritable, Text, org.apache.hadoop.io.Text, NullWritable>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.k = conf.getInt("topk", 1);
this.type = conf.get("type", "min");
if("min".equals(this.type)){
topkSet = new TreeSet<>();
}else {
topkSet = new TreeSet<>(new Comparator<TFIDFWord>() {
@Override
public int compare(TFIDFWord o1, TFIDFWord o2) {
return -o1.compareTo(o2);
}
});
}
}
项目:HotTopicsApp
文件:HotTopicsApp.java
@Override
protected void reduce(NullWritable k2, Iterable<Text> v2s,
Reducer<NullWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
for (Text v2 : v2s) {
String line = v2.toString();
topkSet.add(new TFIDFWord(line));
if(topkSet.size()>k){
topkSet.pollLast();
}
}
for (TFIDFWord v : topkSet) {
k3.set(v.toString());
context.write(k3, NullWritable.get());
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple kill job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createKillJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
Job theJob = Job.getInstance(conf);
theJob.setJobName("Kill-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(KillMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:openimaj
文件:CumulativeTimeWord.java
@Override
protected void reduce(BytesWritable wordtimeb, Iterable<BooleanWritable> wordBools, Reducer<BytesWritable,BooleanWritable,LongWritable,BytesWritable>.Context context) throws IOException ,InterruptedException {
ReadWritableStringLong wordtime = IOUtils.deserialize(wordtimeb.getBytes(), ReadWritableStringLong.class);
long time = wordtime.secondObject();
boolean seenInPresent = false;
boolean seenInPast = false;
for (BooleanWritable isfrompast: wordBools) {
boolean frompast = isfrompast.get();
seenInPresent |= !frompast;
seenInPast |= frompast;
if(seenInPast && seenInPresent){
// then we've seen all the ones from this time if we were to see them, so we can break early. MASSIVE SAVINGS HERE
break;
}
}
ReadWritableBooleanBoolean intersectionUnion = new ReadWritableBooleanBoolean(seenInPast && seenInPresent,seenInPast || seenInPresent);
context.write(new LongWritable(time), new BytesWritable(IOUtils.serialize(intersectionUnion)));
}
项目:openimaj
文件:ReduceValuesByTime.java
protected static synchronized void loadOptions(Reducer<LongWritable, BytesWritable, NullWritable, Text>.Context context) throws IOException {
if (options == null) {
try {
options = context.getConfiguration().getStrings(Values.ARGS_KEY);
matlabOut = context.getConfiguration().getBoolean(Values.MATLAB_OUT, false);
timeIndex = TimeIndex.readTimeCountLines(options[0]);
if (matlabOut) {
wordIndex = WordIndex.readWordCountLines(options[0]);
valuesLocation = options[0] + "/values/values.%d.mat";
}
System.out.println("timeindex loaded: " + timeIndex.size());
} catch (Exception e) {
throw new IOException(e);
}
}
}
项目:titan0.5.4-hbase1.1.1-custom
文件:LinkMapReduce.java
@Override
public void setup(final Reducer.Context context) throws IOException, InterruptedException {
faunusConf = ModifiableHadoopConfiguration.of(DEFAULT_COMPAT.getContextConfiguration(context));
if (!faunusConf.has(LINK_DIRECTION)) {
Iterator<Entry<String, String>> it = DEFAULT_COMPAT.getContextConfiguration(context).iterator();
log.error("Broken configuration missing {}", LINK_DIRECTION);
log.error("---- Start config dump ----");
while (it.hasNext()) {
Entry<String,String> ent = it.next();
log.error("k:{} -> v:{}", ent.getKey(), ent.getValue());
}
log.error("---- End config dump ----");
throw new NullPointerException();
}
direction = faunusConf.get(LINK_DIRECTION).opposite();
}
项目:hadoop-2.6.0-cdh5.4.3
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple fail job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createFailJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
FileSystem fs = outdir.getFileSystem(conf);
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}
conf.setInt("mapred.map.max.attempts", 2);
Job theJob = Job.getInstance(conf);
theJob.setJobName("Fail-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(FailMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:openimaj
文件:TimeIndex.java
@Override
public SequenceFileTextStage<LongWritable,BytesWritable, LongWritable,LongWritable,NullWritable,Text>stage() {
return new SequenceFileTextStage<LongWritable,BytesWritable, LongWritable,LongWritable,NullWritable,Text>() {
@Override
public void setup(Job job) {
job.setSortComparatorClass(LongWritable.Comparator.class);
job.setNumReduceTasks(1);
}
@Override
public Class<? extends Mapper<LongWritable, BytesWritable, LongWritable, LongWritable>> mapper() {
return TimeIndex.Map.class;
}
@Override
public Class<? extends Reducer<LongWritable, LongWritable,NullWritable,Text>> reducer() {
return TimeIndex.Reduce.class;
}
@Override
public String outname() {
return "times";
}
};
}
项目:incubator-rya
文件:CountPlan.java
@Override
public void reduce(final IntermediateProspect prospect, final Iterable<LongWritable> counts, final Date timestamp, final Reducer.Context context) throws IOException, InterruptedException {
long sum = 0;
for(final LongWritable count : counts) {
sum += count.get();
}
final String indexType = prospect.getTripleValueType().getIndexType();
// not sure if this is the best idea..
if ((sum >= 0) || indexType.equals(TripleValueType.PREDICATE.getIndexType())) {
final Mutation m = new Mutation(indexType + DELIM + prospect.getData() + DELIM + ProspectorUtils.getReverseIndexDateTime(timestamp));
final String dataType = prospect.getDataType();
final ColumnVisibility visibility = new ColumnVisibility(prospect.getVisibility());
final Value sumValue = new Value(("" + sum).getBytes(StandardCharsets.UTF_8));
m.put(COUNT, prospect.getDataType(), visibility, timestamp.getTime(), sumValue);
context.write(null, m);
}
}
项目:openimaj
文件:CumulativeJacardReducer.java
@Override
protected void reduce(LongWritable time, java.lang.Iterable<Text> words, org.apache.hadoop.mapreduce.Reducer<LongWritable,Text,NullWritable,Text>.Context context) throws java.io.IOException ,InterruptedException {
HashSet<String> unseenwords = new HashSet<String>();
StringWriter writer = new StringWriter();
for (Text text : words) {
unseenwords.add(text.toString());
}
long intersection = 0;
for (String string : unseenwords) {
if(this.seenwords.contains(string)) intersection += 1;
this.seenwords.add(string);
}
JacardIndex index = new JacardIndex(time.get(),intersection,this.seenwords.size());
IOUtils.writeASCII(writer, index);
context.write(NullWritable.get(), new Text(writer.toString()));
}
项目:hadoop-plus
文件:KNNCombiner.java
protected void reduce(
Text key,
java.lang.Iterable<Vector2SF> value,
org.apache.hadoop.mapreduce.Reducer<Text, Vector2SF, Text, Vector2SF>.Context context)
throws java.io.IOException, InterruptedException {
ArrayList<Vector2SF> vs = new ArrayList<Vector2SF>();
// sort each vector2SF by similarty
// System.out.println("combining key: " + key + " value: " );
for (Vector2SF v : value) {
// System.out.println(v.getV1() + ", " + v.getV2());
vs.add(new Vector2SF(v.getV1(), v.getV2()));
}
Collections.sort(vs, new Comparator<Vector2SF>() {
@Override
public int compare(Vector2SF o1, Vector2SF o2) {
return Double.compare(o2.getV2(), o1.getV2());
}
});
// System.out.println("vs after sorting: " + vs);
int k = context.getConfiguration().getInt("cn.ac.ict.htc.knn.k", 4);
for (int i = 0; i < k && i < vs.size(); i++) {
// System.out.println("key: " + key + " vs[" + i + "]: " + vs.get(i));
context.write(key, vs.get(i));
}
}
项目:hadoop-plus
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple kill job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createKillJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
Job theJob = Job.getInstance(conf);
theJob.setJobName("Kill-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(KillMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:kylin
文件:MapReduceUtil.java
public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
throws IOException {
KylinConfig kylinConfig = cubeSeg.getConfig();
Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
double totalSizeInM = 0;
for (Double cuboidSize : cubeSizeMap.values()) {
totalSizeInM += cuboidSize;
}
double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
// number of reduce tasks
int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
// at least 1 reducer by default
numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
// no more than 500 reducer by default
numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
logger.info("Having total map input MB " + Math.round(totalSizeInM));
logger.info("Having per reduce MB " + perReduceInputMB);
logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
return numReduceTasks;
}
项目:openimaj
文件:TimeIndex.java
@Override
public void reduce(LongWritable timeslot, Iterable<LongWritable> counts, Reducer<LongWritable,LongWritable,NullWritable,Text>.Context context){
try {
String timeStr = timeslot.toString();
long total = 0;
for (LongWritable count : counts) {
total += count.get();
}
StringWriter swriter = new StringWriter();
CSVPrinter writer = new CSVPrinter(swriter);
writer.write(new String[]{timeStr,total + ""});
writer.flush();
String toWrote = swriter.toString();
context.write(NullWritable.get(), new Text(toWrote));
return;
} catch (Exception e) {
System.err.println("Couldn't reduce to final file");
}
}
项目:hadoop
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple kill job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createKillJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
Job theJob = Job.getInstance(conf);
theJob.setJobName("Kill-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(KillMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:hadoop
文件:TestLineRecordReaderJobs.java
/**
* Creates and runs an MR job
*
* @param conf
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(conf);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.waitForCompletion(true);
}
项目:hadoop
文件:Chain.java
ReduceRunner(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException,
InterruptedException {
this.reducer = reducer;
this.chainContext = context;
this.rw = rw;
}
项目:hadoop
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:hadoop
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:hadoop
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:hadoop
文件:JobContextImpl.java
/**
* Get the combiner class for the job.
*
* @return the combiner class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(COMBINE_CLASS_ATTR, null);
}
项目:hadoop
文件:JobContextImpl.java
/**
* Get the {@link Reducer} class for the job.
*
* @return the {@link Reducer} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getReducerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
}
项目:ditb
文件:HashTable.java
public Job createSubmittableJob(String[] args) throws IOException {
Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
generatePartitions(partitionsPath);
Job job = Job.getInstance(getConf(),
getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
Configuration jobConf = job.getConfiguration();
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
job.setJarByClass(HashTable.class);
TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// use a TotalOrderPartitioner and reducers to group region output into hash files
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
job.setReducerClass(Reducer.class); // identity reducer
job.setNumReduceTasks(tableHash.numHashFiles);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(ImmutableBytesWritable.class);
job.setOutputFormatClass(MapFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
return job;
}
项目:ditb
文件:IntegrationTestBigLinkedList.java
@Override
protected void cleanup(
Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
throws IOException, InterruptedException {
if (this.connection != null) {
this.connection.close();
}
super.cleanup(context);
}
项目:aliyun-oss-hadoop-fs
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple fail job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createFailJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
FileSystem fs = outdir.getFileSystem(conf);
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
Job theJob = Job.getInstance(conf);
theJob.setJobName("Fail-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(FailMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:aliyun-oss-hadoop-fs
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple kill job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createKillJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
Job theJob = Job.getInstance(conf);
theJob.setJobName("Kill-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(KillMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:aliyun-oss-hadoop-fs
文件:TestLineRecordReaderJobs.java
/**
* Creates and runs an MR job
*
* @param conf
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(conf);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.waitForCompletion(true);
}
项目:aliyun-oss-hadoop-fs
文件:Chain.java
ReduceRunner(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException,
InterruptedException {
this.reducer = reducer;
this.chainContext = context;
this.rw = rw;
}
项目:aliyun-oss-hadoop-fs
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:aliyun-oss-hadoop-fs
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:aliyun-oss-hadoop-fs
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:aliyun-oss-hadoop-fs
文件:JobContextImpl.java
/**
* Get the combiner class for the job.
*
* @return the combiner class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(COMBINE_CLASS_ATTR, null);
}
项目:aliyun-oss-hadoop-fs
文件:JobContextImpl.java
/**
* Get the {@link Reducer} class for the job.
*
* @return the {@link Reducer} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getReducerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
}
项目:elephant56
文件:GlobalDistributedDriver.java
public Job createJob(
Configuration configuration,
int numberOfNodes,
long currentGenerationNumber,
String generationNameFormat,
Path currentGenerationsBlockReportsFolderPath,
Schema individualWrapperSchema
) throws IOException {
// Creates a job.
Job job = super.createJob(configuration, numberOfNodes, currentGenerationNumber, currentGenerationNumber,
(currentGenerationNumber - 1L), currentGenerationNumber, generationNameFormat,
currentGenerationsBlockReportsFolderPath, individualWrapperSchema,
GlobalMapper.class, Partitioner.class, Reducer.class);
// Sets the input.
NodesInputFormat.setInputPopulationFolderPath(job, this.getInputFolderPath());
NodesInputFormat.activateInitialisation(job, false);
// Configures the fitness value class.
job.getConfiguration().setClass(Constants.CONFIGURATION_FITNESS_VALUE_CLASS, this.fitnessValueClass,
FitnessValue.class);
// Configures the Fitness Evaluation phase.
job.getConfiguration().setClass(Constants.CONFIGURATION_FITNESS_EVALUATION_CLASS, this.fitnessEvaluationClass,
FitnessEvaluation.class);
// Disables the reducer.
job.setNumReduceTasks(0);
// Returns the job.
return job;
}
项目:big-c
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple fail job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createFailJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
FileSystem fs = outdir.getFileSystem(conf);
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
Job theJob = Job.getInstance(conf);
theJob.setJobName("Fail-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(FailMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:big-c
文件:MapReduceTestUtil.java
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple kill job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createKillJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
Job theJob = Job.getInstance(conf);
theJob.setJobName("Kill-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(KillMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
项目:big-c
文件:TestLineRecordReaderJobs.java
/**
* Creates and runs an MR job
*
* @param conf
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(conf);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.waitForCompletion(true);
}
项目:big-c
文件:Chain.java
ReduceRunner(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException,
InterruptedException {
this.reducer = reducer;
this.chainContext = context;
this.rw = rw;
}
项目:big-c
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}