Java 类org.apache.hadoop.mapred.Reducer 实例源码
项目:aliyun-oss-hadoop-fs
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:big-c
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:flink
文件:HadoopReduceCombineFunction.java
/**
* Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
* @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
* @param conf The JobConf that is used to configure both Hadoop Reducers.
*/
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner, JobConf conf) {
if (hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if (hadoopCombiner == null) {
throw new NullPointerException("Combiner may not be null.");
}
if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.combiner = hadoopCombiner;
this.jobConf = conf;
}
项目:flink
文件:HadoopReduceCombineFunction.java
/**
* Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
* @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
* @param conf The JobConf that is used to configure both Hadoop Reducers.
*/
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
if(hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if(hadoopCombiner == null) {
throw new NullPointerException("Combiner may not be null.");
}
if(conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.combiner = hadoopCombiner;
this.jobConf = conf;
}
项目:flink
文件:HadoopReduceFunction.java
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:hadoop-EAR
文件:PipeReducer.java
public void configure(JobConf job) {
super.configure(job);
Class<?> c = job.getClass("stream.reduce.posthook", null, Mapper.class);
if(c != null) {
postMapper = (Mapper)ReflectionUtils.newInstance(c, job);
LOG.info("PostHook="+c.getName());
}
c = job.getClass("stream.reduce.prehook", null, Reducer.class);
if(c != null) {
preReducer = (Reducer)ReflectionUtils.newInstance(c, job);
oc = new InmemBufferingOutputCollector();
LOG.info("PreHook="+c.getName());
}
this.ignoreKey = job.getBoolean("stream.reduce.ignoreKey", false);
}
项目:hadoop-plus
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:FlexMap
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:hops
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:hadoop-TCP
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:hardfs
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:hadoop-on-lustre2
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:RDFS
文件:PipeReducer.java
public void configure(JobConf job) {
super.configure(job);
Class<?> c = job.getClass("stream.reduce.posthook", null, Mapper.class);
if(c != null) {
postMapper = (Mapper)ReflectionUtils.newInstance(c, job);
LOG.info("PostHook="+c.getName());
}
c = job.getClass("stream.reduce.prehook", null, Reducer.class);
if(c != null) {
preReducer = (Reducer)ReflectionUtils.newInstance(c, job);
oc = new InmemBufferingOutputCollector();
LOG.info("PreHook="+c.getName());
}
this.ignoreKey = job.getBoolean("stream.reduce.ignoreKey", false);
}
项目:vs.msc.ws14
文件:HadoopReduceCombineFunction.java
/**
* Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
* @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
* @param conf The JobConf that is used to configure both Hadoop Reducers.
*/
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
if(hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if(hadoopCombiner == null) {
throw new NullPointerException("Combiner may not be null.");
}
if(conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.combiner = hadoopCombiner;
this.jobConf = conf;
}
项目:incubator-tez
文件:MRCombiner.java
private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException {
Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class);
Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf);
OutputCollector collector = new OutputCollector() {
@Override
public void collect(Object key, Object value) throws IOException {
writer.append(key, value);
}
};
CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator);
while (values.moveToNext()) {
combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter);
}
}
项目:mapreduce-fork
文件:MergeManager.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:tez
文件:MRCombiner.java
private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException {
Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class);
Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf);
OutputCollector collector = new OutputCollector() {
@Override
public void collect(Object key, Object value) throws IOException {
writer.append(key, value);
combineOutputRecordsCounter.increment(1);
}
};
CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator);
while (values.moveToNext()) {
combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter);
}
}
项目:hadoop
文件:MergeManagerImpl.java
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
项目:flink
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
this.combiner.configure(jobConf);
this.reporter = new HadoopDummyReporter();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
this.combineCollector = new HadoopOutputCollector<>();
this.reduceCollector = new HadoopOutputCollector<>();
}
项目:flink
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
}
项目:flink
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>> combinerClass =
(Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>>) in.readObject();
combiner = InstantiationUtil.instantiate(combinerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
项目:flink
文件:HadoopReduceFunction.java
/**
* Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer to wrap.
* @param conf The JobConf that is used to configure the Hadoop Reducer.
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
if (hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.jobConf = conf;
}
项目:flink
文件:HadoopReduceFunction.java
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
}
项目:flink
文件:HadoopReduceFunction.java
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
项目:flink
文件:HadoopReduceFunction.java
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
项目:flink
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
this.combiner.configure(jobConf);
this.reporter = new HadoopDummyReporter();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
this.combineCollector = new HadoopOutputCollector<>();
this.reduceCollector = new HadoopOutputCollector<>();
}
项目:flink
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
}
项目:flink
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
combiner = InstantiationUtil.instantiate(combinerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
项目:flink
文件:HadoopReduceFunction.java
/**
* Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer to wrap.
* @param conf The JobConf that is used to configure the Hadoop Reducer.
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
if(hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if(conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.jobConf = conf;
}
项目:flink
文件:HadoopReduceFunction.java
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
项目:flink
文件:HadoopReduceFunction.java
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
项目:hiped2
文件:PipelineTest.java
@Before
public void setUp() {
mapper1 = new IdentityMapper<Text, Text>();
reducer1 = new IdentityReducer<Text, Text>();
mapper2 = new IdentityMapper<Text, Text>();
reducer2 = new IdentityReducer<Text, Text>();
driver = new PipelineMapReduceDriver<Text, Text, Text, Text>();
driver.addMapReduce(new Pair<Mapper, Reducer>(mapper1, reducer1));
driver.addMapReduce(new Pair<Mapper, Reducer>(mapper2, reducer2));
}
项目:incubator-asterixdb-hyracks
文件:HadoopReducerOperatorDescriptor.java
@SuppressWarnings("unchecked")
ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException,
InterruptedException, ClassNotFoundException {
((org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer) reducer).super(new MRContextUtil()
.createReduceContext(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null,
null, null, Class.forName("org.apache.hadoop.io.NullWritable"),
Class.forName("org.apache.hadoop.io.NullWritable")));
}
项目:incubator-asterixdb-hyracks
文件:HadoopReducerOperatorDescriptor.java
@Override
public void close() throws HyracksDataException {
// -- - close - --
try {
if (!jobConf.getUseNewMapper()) {
((org.apache.hadoop.mapred.Reducer) reducer).close();
}
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
项目:vs.msc.ws14
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
this.combiner.configure(jobConf);
this.reporter = new HadoopDummyReporter();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
项目:vs.msc.ws14
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
项目:vs.msc.ws14
文件:HadoopReduceCombineFunction.java
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
combiner = InstantiationUtil.instantiate(combinerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
项目:vs.msc.ws14
文件:HadoopReduceFunction.java
/**
* Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer to wrap.
* @param conf The JobConf that is used to configure the Hadoop Reducer.
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
if(hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if(conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.jobConf = conf;
}
项目:vs.msc.ws14
文件:HadoopReduceFunction.java
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
}