Java 类org.apache.hadoop.mapreduce.ReduceContext 实例源码
项目:hadoop-2.6.0-cdh5.4.3
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:hadoop-plus
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:FlexMap
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:hops
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:hadoop-TCP
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:hardfs
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:hadoop-on-lustre2
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:geowave
文件:RasterTileResizeCombiner.java
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context )
throws IOException,
InterruptedException {
final GridCoverage mergedCoverage = helper.getMergedCoverage(
key,
values);
if (mergedCoverage != null) {
context.write(
key,
mergedCoverage);
}
}
项目:geowave
文件:SimpleFeatureOutputReducer.java
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context )
throws IOException,
InterruptedException {
final Iterator<Object> valIt = values.iterator();
if (valIt.hasNext()) {
key.setAdapterId(outputAdapter.getAdapterId());
final SimpleFeature feature = getSimpleFeature(
key,
valIt.next());
context.write(
key,
feature);
}
}
项目:mapreduce-fork
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:hadoop
文件:ChainReduceContextImpl.java
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
项目:hadoop
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:hadoop
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:hadoop
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:aliyun-oss-hadoop-fs
文件:ChainReduceContextImpl.java
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
项目:aliyun-oss-hadoop-fs
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:aliyun-oss-hadoop-fs
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:aliyun-oss-hadoop-fs
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:big-c
文件:ChainReduceContextImpl.java
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
项目:big-c
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:big-c
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:big-c
文件:Chain.java
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
项目:sPCA
文件:DummyRecordWriter.java
@SuppressWarnings({ "unchecked", "rawtypes" })
private static <K1, V1, K2, V2> Reducer<K1, V1, K2, V2>.Context buildNewReducerContext(
Configuration configuration, RecordWriter<K2, V2> output,
Class<K1> keyClass, Class<V1> valueClass) throws Exception {
Class<?> reduceContextImplClass = Class
.forName("org.apache.hadoop.mapreduce.task.ReduceContextImpl");
Constructor<?> cons = reduceContextImplClass.getConstructors()[0];
Object reduceContextImpl = cons.newInstance(configuration,
new TaskAttemptID(), new MockIterator(), null, null, output, null,
new DummyStatusReporter(), null, keyClass, valueClass);
Class<?> wrappedReducerClass = Class
.forName("org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer");
Object wrappedReducer = wrappedReducerClass.newInstance();
Method getReducerContext = wrappedReducerClass.getMethod(
"getReducerContext", ReduceContext.class);
return (Reducer.Context) getReducerContext.invoke(wrappedReducer,
reduceContextImpl);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:ChainReduceContextImpl.java
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:hadoop-plus
文件:ChainReduceContextImpl.java
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
项目:hadoop-plus
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:hadoop-plus
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:FlexMap
文件:ChainReduceContextImpl.java
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
项目:FlexMap
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:FlexMap
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:hops
文件:ChainReduceContextImpl.java
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
项目:hops
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:hops
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
项目:Cubert
文件:PhaseContext.java
public static void create(ReduceContext context, Configuration conf) throws IOException
{
redContext = context;
isMapper = false;
initCommonConfig(conf);
PigStatusReporter.getInstance().setContext(new MRTaskContext(context));
}
项目:Cubert
文件:TestOperators.java
@SuppressWarnings("rawtypes")
@BeforeTest
void setupConf() throws IOException
{
Configuration conf = new JobConf();
conf.setBoolean(CubertStrings.USE_COMPACT_SERIALIZATION, false);
PhaseContext.create((MapContext) new TestContext(), conf);
PhaseContext.create((ReduceContext) new TestContext(), conf);
}
项目:hadoop-TCP
文件:ChainReduceContextImpl.java
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
项目:hadoop-TCP
文件:Chain.java
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
项目:hadoop-TCP
文件:Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}