Java 类org.apache.hadoop.mapred.OutputCollector 实例源码
项目:ditb
文件:TestTableInputFormat.java
@Override
public void map(ImmutableBytesWritable key, Result value,
OutputCollector<NullWritable,NullWritable> output,
Reporter reporter) throws IOException {
for (Cell cell : value.listCells()) {
reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
.increment(1l);
reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
.increment(1l);
reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
.increment(1l);
}
}
项目:mumu-mapreduce
文件:MaxTemperatureMapRedTest.java
@Test
public void reduce() {
MaxTemperatureMapRed.MaxTemperatureReduce maxTemperatureReduce = new MaxTemperatureMapRed.MaxTemperatureReduce();
try {
List<IntWritable> list = new ArrayList<IntWritable>();
list.add(new IntWritable(12));
list.add(new IntWritable(31));
list.add(new IntWritable(45));
list.add(new IntWritable(23));
list.add(new IntWritable(21));
maxTemperatureReduce.reduce(new Text("1901"), list.iterator(), new OutputCollector<Text, IntWritable>() {
@Override
public void collect(final Text text, final IntWritable intWritable) throws IOException {
log.info(text.toString() + " " + intWritable.get());
}
}, null);
} catch (IOException e) {
e.printStackTrace();
}
}
项目:hadoop
文件:TestDFSIO.java
@Override // IOMapperBase
void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Long objSize) throws IOException {
long totalSize = objSize.longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
new Text(String.valueOf(1)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
new Text(String.valueOf(totalSize)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
new Text(String.valueOf(execTime)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
new Text(String.valueOf(ioRateMbSec*1000)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
项目:hadoop
文件:PipesReducer.java
/**
* Process all of the keys and values. Start up the application if we haven't
* started it yet.
*/
public void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter
) throws IOException {
isOk = false;
startApplication(output, reporter);
downlink.reduceKey(key);
while (values.hasNext()) {
downlink.reduceValue(values.next());
}
if(skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
downlink.flush();
}
isOk = true;
}
项目:hadoop
文件:PipesReducer.java
@SuppressWarnings("unchecked")
private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
if (application == null) {
try {
LOG.info("starting application");
application =
new Application<K2, V2, K3, V3>(
job, null, output, reporter,
(Class<? extends K3>) job.getOutputKeyClass(),
(Class<? extends V3>) job.getOutputValueClass());
downlink = application.getDownlink();
} catch (InterruptedException ie) {
throw new RuntimeException("interrupted", ie);
}
int reduce=0;
downlink.runReduce(reduce, Submitter.getIsJavaRecordWriter(job));
}
}
项目:hadoop
文件:ValueAggregatorCombiner.java
/** Combines values for a given key.
* @param key the key is expected to be a Text object, whose prefix indicates
* the type of aggregation to aggregate the values.
* @param values the values to combine
* @param output to collect combined values
*/
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String keyStr = key.toString();
int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
String type = keyStr.substring(0, pos);
ValueAggregator aggregator = ValueAggregatorBaseDescriptor
.generateValueAggregator(type);
while (values.hasNext()) {
aggregator.addNextValue(values.next());
}
Iterator outputs = aggregator.getCombinerOutput().iterator();
while (outputs.hasNext()) {
Object v = outputs.next();
if (v instanceof Text) {
output.collect(key, (Text)v);
} else {
output.collect(key, new Text(v.toString()));
}
}
}
项目:hadoop
文件:HadoopArchives.java
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<Text, Text> out,
Reporter reporter) throws IOException {
keyVal = key.get();
while(values.hasNext()) {
Text value = values.next();
String towrite = value.toString() + "\n";
indexStream.write(towrite.getBytes(Charsets.UTF_8));
written++;
if (written > numIndexes -1) {
// every 1000 indexes we report status
reporter.setStatus("Creating index for archives");
reporter.progress();
endIndex = keyVal;
String masterWrite = startIndex + " " + endIndex + " " + startPos
+ " " + indexStream.getPos() + " \n" ;
outStream.write(masterWrite.getBytes(Charsets.UTF_8));
startPos = indexStream.getPos();
startIndex = endIndex;
written = 0;
}
}
}
项目:hadoop
文件:DistCh.java
/** Run a FileOperation */
public void map(Text key, FileOperation value,
OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
) throws IOException {
try {
value.run(jobconf);
++succeedcount;
reporter.incrCounter(Counter.SUCCEED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
项目:hadoop
文件:DataJoinMapperBase.java
public void map(Object key, Object value,
OutputCollector output, Reporter reporter) throws IOException {
if (this.reporter == null) {
this.reporter = reporter;
}
addLongValue("totalCount", 1);
TaggedMapOutput aRecord = generateTaggedMapOutput(value);
if (aRecord == null) {
addLongValue("discardedCount", 1);
return;
}
Text groupKey = generateGroupKey(aRecord);
if (groupKey == null) {
addLongValue("nullGroupKeyCount", 1);
return;
}
output.collect(groupKey, aRecord);
addLongValue("collectedCount", 1);
}
项目:hadoop
文件:DataJoinReducerBase.java
public void reduce(Object key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
if (this.reporter == null) {
this.reporter = reporter;
}
SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
Object[] tags = groups.keySet().toArray();
ResetableIterator[] groupValues = new ResetableIterator[tags.length];
for (int i = 0; i < tags.length; i++) {
groupValues[i] = groups.get(tags[i]);
}
joinAndCollect(tags, groupValues, key, output, reporter);
addLongValue("groupCount", 1);
for (int i = 0; i < tags.length; i++) {
groupValues[i].close();
}
}
项目:hadoop
文件:DataJoinReducerBase.java
/**
* Perform the actual join recursively.
*
* @param tags
* a list of input tags
* @param values
* a list of value lists, each corresponding to one input source
* @param pos
* indicating the next value list to be joined
* @param partialList
* a list of values, each from one value list considered so far.
* @param key
* @param output
* @throws IOException
*/
private void joinAndCollect(Object[] tags, ResetableIterator[] values,
int pos, Object[] partialList, Object key,
OutputCollector output, Reporter reporter) throws IOException {
if (values.length == pos) {
// get a value from each source. Combine them
TaggedMapOutput combined = combine(tags, partialList);
collect(key, combined, output, reporter);
return;
}
ResetableIterator nextValues = values[pos];
nextValues.reset();
while (nextValues.hasNext()) {
Object v = nextValues.next();
partialList[pos] = v;
joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
}
}
项目:ditb
文件:TestTableMapReduceUtil.java
@Override
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
throws IOException {
String strKey = Bytes.toString(key.get());
List<Put> result = new ArrayList<Put>();
while (values.hasNext())
result.add(values.next());
if (relation.keySet().contains(strKey)) {
Set<String> set = relation.get(strKey);
if (set != null) {
assertEquals(set.size(), result.size());
} else {
throwAccertionError("Test infrastructure error: set is null");
}
} else {
throwAccertionError("Test infrastructure error: key not found in map");
}
}
项目:ditb
文件:TestTableMapReduceUtil.java
@Override
public void map(ImmutableBytesWritable row, Result result,
OutputCollector<ImmutableBytesWritable, Put> outCollector,
Reporter reporter) throws IOException {
String rowKey = Bytes.toString(result.getRow());
final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
Bytes.toBytes(PRESIDENT_PATTERN));
final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
Bytes.toBytes(ACTOR_PATTERN));
ImmutableBytesWritable outKey = null;
if (rowKey.startsWith(PRESIDENT_PATTERN)) {
outKey = pKey;
} else if (rowKey.startsWith(ACTOR_PATTERN)) {
outKey = aKey;
} else {
throw new AssertionError("unexpected rowKey");
}
String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
COLUMN_QUALIFIER));
outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
}
项目:ditb
文件:TestIdentityTableMap.java
@Test
@SuppressWarnings({ "deprecation", "unchecked" })
public void shouldCollectPredefinedTimes() throws IOException {
int recordNumber = 999;
Result resultMock = mock(Result.class);
IdentityTableMap identityTableMap = null;
try {
Reporter reporterMock = mock(Reporter.class);
identityTableMap = new IdentityTableMap();
ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
mock(OutputCollector.class);
for (int i = 0; i < recordNumber; i++)
identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
reporterMock);
verify(outputCollectorMock, times(recordNumber)).collect(
Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
} finally {
if (identityTableMap != null)
identityTableMap.close();
}
}
项目:big-c
文件:DataJoinReducerBase.java
public void reduce(Object key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
if (this.reporter == null) {
this.reporter = reporter;
}
SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
Object[] tags = groups.keySet().toArray();
ResetableIterator[] groupValues = new ResetableIterator[tags.length];
for (int i = 0; i < tags.length; i++) {
groupValues[i] = groups.get(tags[i]);
}
joinAndCollect(tags, groupValues, key, output, reporter);
addLongValue("groupCount", 1);
for (int i = 0; i < tags.length; i++) {
groupValues[i].close();
}
}
项目:big-c
文件:PipesReducer.java
/**
* Process all of the keys and values. Start up the application if we haven't
* started it yet.
*/
public void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter
) throws IOException {
isOk = false;
startApplication(output, reporter);
downlink.reduceKey(key);
while (values.hasNext()) {
downlink.reduceValue(values.next());
}
if(skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
downlink.flush();
}
isOk = true;
}
项目:gemfirexd-oss
文件:VerifyHdfsDataUsingMR.java
@Override
public void map(Key key, Row value, OutputCollector<Text, MyRow> output, Reporter reporter) throws IOException {
String tableName = null;
try {
ResultSet rs = value.getRowAsResultSet();
tableName = rs.getMetaData().getTableName(1);
Log.getLogWriter().info("i am in a mapper and table Name is " + tableName);
int cid = rs.getInt("cid");
String cname = rs.getString("cust_name");
String addr = rs.getString("addr");
int tid = rs.getInt("tid");
Log.getLogWriter().info("mapper procesing record from " + tableName + ": " + cid + ": " + cname + ": " + addr + ": " + tid);
Text myKey = new Text(Integer.toString(cid));
MyRow myRow = new MyRow (cid, cname , addr , tid);
Log.getLogWriter().info("MAPPER writing intermediate record " + myRow.toString());
output.collect(myKey, myRow);
} catch (SQLException se) {
System.err.println("Error logging result set" + se);
}
}
项目:es-hadoop-v2.2.0
文件:EsHadoopScheme.java
@Override
public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setOutputFormat(EsOutputFormat.class);
// define an output dir to prevent Cascading from setting up a TempHfs and overriding the OutputFormat
Settings set = loadSettings(conf, false);
Log log = LogFactory.getLog(EsTap.class);
InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, log);
InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, log);
InitializationUtils.setBytesConverterIfNeeded(set, CascadingLocalBytesConverter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(set, CascadingFieldExtractor.class, log);
// NB: we need to set this property even though it is not being used - and since and URI causes problem, use only the resource/file
//conf.set("mapred.output.dir", set.getTargetUri() + "/" + set.getTargetResource());
HadoopCfgUtils.setFileOutputFormatDir(conf, set.getResourceWrite());
HadoopCfgUtils.setOutputCommitterClass(conf, EsOutputFormat.EsOldAPIOutputCommitter.class.getName());
if (log.isTraceEnabled()) {
log.trace("Initialized (sink) configuration " + HadoopCfgUtils.asProperties(conf));
}
}
项目:aliyun-oss-hadoop-fs
文件:PipesReducer.java
/**
* Process all of the keys and values. Start up the application if we haven't
* started it yet.
*/
public void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter
) throws IOException {
isOk = false;
startApplication(output, reporter);
downlink.reduceKey(key);
while (values.hasNext()) {
downlink.reduceValue(values.next());
}
if(skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
downlink.flush();
}
isOk = true;
}
项目:GeoCrawler
文件:NodeDumper.java
/**
* Outputs the url with the appropriate number of inlinks, outlinks, or for
* score.
*/
public void map(Text key, Node node,
OutputCollector<FloatWritable, Text> output, Reporter reporter)
throws IOException {
float number = 0;
if (inlinks) {
number = node.getNumInlinks();
} else if (outlinks) {
number = node.getNumOutlinks();
} else {
number = node.getInlinkScore();
}
// number collected with negative to be descending
output.collect(new FloatWritable(-number), key);
}
项目:gemfirexd-oss
文件:BusyAirports.java
@Override
public void map(Object key, ResultSet rs,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String origAirport;
String destAirport;
try {
while (rs.next()) {
origAirport = rs.getString("ORIG_AIRPORT");
destAirport = rs.getString("DEST_AIRPORT");
reusableText.set(origAirport);
output.collect(reusableText, countOne);
reusableText.set(destAirport);
output.collect(reusableText, countOne);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
项目:GeoCrawler
文件:NodeDumper.java
/**
* Outputs the host or domain as key for this record and numInlinks,
* numOutlinks or score as the value.
*/
public void map(Text key, Node node,
OutputCollector<Text, FloatWritable> output, Reporter reporter)
throws IOException {
float number = 0;
if (inlinks) {
number = node.getNumInlinks();
} else if (outlinks) {
number = node.getNumOutlinks();
} else {
number = node.getInlinkScore();
}
if (host) {
key.set(URLUtil.getHost(key.toString()));
} else {
key.set(URLUtil.getDomainName(key.toString()));
}
output.collect(key, new FloatWritable(number));
}
项目:aliyun-oss-hadoop-fs
文件:HadoopArchives.java
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<Text, Text> out,
Reporter reporter) throws IOException {
keyVal = key.get();
while(values.hasNext()) {
Text value = values.next();
String towrite = value.toString() + "\n";
indexStream.write(towrite.getBytes(Charsets.UTF_8));
written++;
if (written > numIndexes -1) {
// every 1000 indexes we report status
reporter.setStatus("Creating index for archives");
reporter.progress();
endIndex = keyVal;
String masterWrite = startIndex + " " + endIndex + " " + startPos
+ " " + indexStream.getPos() + " \n" ;
outStream.write(masterWrite.getBytes(Charsets.UTF_8));
startPos = indexStream.getPos();
startIndex = endIndex;
written = 0;
}
}
}
项目:aliyun-oss-hadoop-fs
文件:DataJoinMapperBase.java
public void map(Object key, Object value,
OutputCollector output, Reporter reporter) throws IOException {
if (this.reporter == null) {
this.reporter = reporter;
}
addLongValue("totalCount", 1);
TaggedMapOutput aRecord = generateTaggedMapOutput(value);
if (aRecord == null) {
addLongValue("discardedCount", 1);
return;
}
Text groupKey = generateGroupKey(aRecord);
if (groupKey == null) {
addLongValue("nullGroupKeyCount", 1);
return;
}
output.collect(groupKey, aRecord);
addLongValue("collectedCount", 1);
}
项目:aliyun-oss-hadoop-fs
文件:DataJoinReducerBase.java
/**
* Perform the actual join recursively.
*
* @param tags
* a list of input tags
* @param values
* a list of value lists, each corresponding to one input source
* @param pos
* indicating the next value list to be joined
* @param partialList
* a list of values, each from one value list considered so far.
* @param key
* @param output
* @throws IOException
*/
private void joinAndCollect(Object[] tags, ResetableIterator[] values,
int pos, Object[] partialList, Object key,
OutputCollector output, Reporter reporter) throws IOException {
if (values.length == pos) {
// get a value from each source. Combine them
TaggedMapOutput combined = combine(tags, partialList);
collect(key, combined, output, reporter);
return;
}
ResetableIterator nextValues = values[pos];
nextValues.reset();
while (nextValues.hasNext()) {
Object v = nextValues.next();
partialList[pos] = v;
joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
}
}
项目:THUTag
文件:ImportDouban.java
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> collector, Reporter r) throws IOException {
String json = value.toString();
if (json.contains("\"tag\":")) {
// This is a douban raw tag.
DoubanRawTag tag = J.fromTextAsJson(value, DoubanRawTag.class);
outkey.set(Long.toString(tag.getSubject_id()));
r.incrCounter(MRCounter.NUM_TAGS, 1);
} else {
// This is a douban subject.
DoubanRawSubject subject =
J.fromTextAsJson(value, DoubanRawSubject.class);
// We use books only.
if (subject.getCat_id() != DOUBAN_BOOK_CATID) {
return;
}
outkey.set(Long.toString(subject.getId()));
r.incrCounter(MRCounter.NUM_SUBJECTS, 1);
}
collector.collect(outkey, value);
}
项目:big-c
文件:HadoopArchives.java
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<Text, Text> out,
Reporter reporter) throws IOException {
keyVal = key.get();
while(values.hasNext()) {
Text value = values.next();
String towrite = value.toString() + "\n";
indexStream.write(towrite.getBytes(Charsets.UTF_8));
written++;
if (written > numIndexes -1) {
// every 1000 indexes we report status
reporter.setStatus("Creating index for archives");
reporter.progress();
endIndex = keyVal;
String masterWrite = startIndex + " " + endIndex + " " + startPos
+ " " + indexStream.getPos() + " \n" ;
outStream.write(masterWrite.getBytes(Charsets.UTF_8));
startPos = indexStream.getPos();
startIndex = endIndex;
written = 0;
}
}
}
项目:gemfirexd-oss
文件:TopBusyAirportGemfirexd.java
@Override
public void map(Object key, Row row,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String origAirport;
String destAirport;
try {
ResultSet rs = row.getRowAsResultSet();
origAirport = rs.getString("ORIG_AIRPORT");
destAirport = rs.getString("DEST_AIRPORT");
reusableText.set(origAirport);
output.collect(reusableText, countOne);
reusableText.set(destAirport);
output.collect(reusableText, countOne);
} catch (SQLException e) {
e.printStackTrace();
}
}
项目:big-c
文件:ValueAggregatorReducer.java
/**
* @param key
* the key is expected to be a Text object, whose prefix indicates
* the type of aggregation to aggregate the values. In effect, data
* driven computing is achieved. It is assumed that each aggregator's
* getReport method emits appropriate output for the aggregator. This
* may be further customiized.
* @param values
* the values to be aggregated
*/
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String keyStr = key.toString();
int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
String type = keyStr.substring(0, pos);
keyStr = keyStr.substring(pos
+ ValueAggregatorDescriptor.TYPE_SEPARATOR.length());
ValueAggregator aggregator = ValueAggregatorBaseDescriptor
.generateValueAggregator(type);
while (values.hasNext()) {
aggregator.addNextValue(values.next());
}
String val = aggregator.getReport();
key = new Text(keyStr);
output.collect(key, new Text(val));
}
项目:aliyun-maxcompute-data-collectors
文件:ExplicitSetMapper.java
public void map(LongWritable key, Text val,
OutputCollector<Text, NullWritable> out, Reporter r) throws IOException {
// Try to set the field.
userRecord.setField(setCol, setVal);
Map<String, Object> fieldVals = userRecord.getFieldMap();
if (!fieldVals.get(setCol).equals(setVal)) {
throw new IOException("Could not set column value! Got back "
+ fieldVals.get(setCol));
} else {
LOG.info("Correctly changed value for col " + setCol + " to " + setVal);
}
}
项目:aliyun-maxcompute-data-collectors
文件:ReparseMapper.java
public void map(LongWritable key, Text val,
OutputCollector<Text, NullWritable> out, Reporter r) throws IOException {
LOG.info("Mapper input line: " + val.toString());
try {
// Use the user's record class to parse the line back in.
userRecord.parse(val);
} catch (RecordParser.ParseError pe) {
LOG.error("Got parse error: " + pe.toString());
throw new IOException(pe);
}
LOG.info("Mapper output line: " + userRecord.toString());
out.collect(new Text(userRecord.toString()), NullWritable.get());
if (!userRecord.toString(false).equals(val.toString())) {
// Could not format record w/o end-of-record delimiter.
throw new IOException("Returned string w/o EOR has value ["
+ userRecord.toString(false) + "] when ["
+ val.toString() + "] was expected.");
}
if (!userRecord.toString().equals(val.toString() + "\n")) {
// misparsed.
throw new IOException("Returned string has value ["
+ userRecord.toString() + "] when ["
+ val.toString() + "\n] was expected.");
}
}
项目:hadoop
文件:ExternalMapReduce.java
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, IntWritable> output,
Reporter reporter)
throws IOException {
//check for classpath
String classpath = System.getProperty("java.class.path");
if (classpath.indexOf("testjob.jar") == -1) {
throw new IOException("failed to find in the library " + classpath);
}
if (classpath.indexOf("test.jar") == -1) {
throw new IOException("failed to find the library test.jar in"
+ classpath);
}
//fork off ls to see if the file exists.
// java file.exists() will not work on
// Windows since it is a symlink
String[] argv = new String[7];
argv[0] = "ls";
argv[1] = "files_tmp";
argv[2] = "localfilelink";
argv[3] = "dfsfilelink";
argv[4] = "tarlink";
argv[5] = "ziplink";
argv[6] = "test.tgz";
Process p = Runtime.getRuntime().exec(argv);
int ret = -1;
try {
ret = p.waitFor();
} catch(InterruptedException ie) {
//do nothing here.
}
if (ret != 0) {
throw new IOException("files_tmp does not exist");
}
}
项目:hadoop
文件:SliveReducer.java
@Override // Reducer
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
OperationOutput collector = null;
int reduceAm = 0;
int errorAm = 0;
logAndSetStatus(reporter, "Iterating over reduction values for key " + key);
while (values.hasNext()) {
Text value = values.next();
try {
OperationOutput val = new OperationOutput(key, value);
if (collector == null) {
collector = val;
} else {
collector = OperationOutput.merge(collector, val);
}
LOG.info("Combined " + val + " into/with " + collector);
++reduceAm;
} catch (Exception e) {
++errorAm;
logAndSetStatus(reporter, "Error iterating over reduction input "
+ value + " due to : " + StringUtils.stringifyException(e));
if (getConfig().shouldExitOnFirstError()) {
break;
}
}
}
logAndSetStatus(reporter, "Reduced " + reduceAm + " values with " + errorAm
+ " errors");
if (collector != null) {
logAndSetStatus(reporter, "Writing output " + collector.getKey() + " : "
+ collector.getOutputValue());
output.collect(collector.getKey(), collector.getOutputValue());
}
}
项目:hadoop
文件:SliveMapper.java
/**
* Runs the given operation and reports on its results
*
* @param op
* the operation to run
* @param reporter
* the status reporter to notify
* @param output
* the output to write to
* @throws IOException
*/
private void runOperation(Operation op, Reporter reporter,
OutputCollector<Text, Text> output, long opNum) throws IOException {
if (op == null) {
return;
}
logAndSetStatus(reporter, "Running operation #" + opNum + " (" + op + ")");
List<OperationOutput> opOut = op.run(filesystem);
logAndSetStatus(reporter, "Finished operation #" + opNum + " (" + op + ")");
if (opOut != null && !opOut.isEmpty()) {
for (OperationOutput outData : opOut) {
output.collect(outData.getKey(), outData.getOutputValue());
}
}
}
项目:hadoop
文件:LoadGeneratorMR.java
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
if (key.equals(OPEN_EXECTIME)){
executionTime[OPEN] = sum;
} else if (key.equals(NUMOPS_OPEN)){
numOfOps[OPEN] = sum;
} else if (key.equals(LIST_EXECTIME)){
executionTime[LIST] = sum;
} else if (key.equals(NUMOPS_LIST)){
numOfOps[LIST] = sum;
} else if (key.equals(DELETE_EXECTIME)){
executionTime[DELETE] = sum;
} else if (key.equals(NUMOPS_DELETE)){
numOfOps[DELETE] = sum;
} else if (key.equals(CREATE_EXECTIME)){
executionTime[CREATE] = sum;
} else if (key.equals(NUMOPS_CREATE)){
numOfOps[CREATE] = sum;
} else if (key.equals(WRITE_CLOSE_EXECTIME)){
System.out.println(WRITE_CLOSE_EXECTIME + " = " + sum);
executionTime[WRITE_CLOSE]= sum;
} else if (key.equals(NUMOPS_WRITE_CLOSE)){
numOfOps[WRITE_CLOSE] = sum;
} else if (key.equals(TOTALOPS)){
totalOps = sum;
} else if (key.equals(ELAPSED_TIME)){
totalTime = sum;
}
result.set(sum);
output.collect(key, result);
// System.out.println("Key = " + key + " Sum is =" + sum);
// printResults(System.out);
}
项目:hadoop
文件:MRCaching.java
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
项目:hadoop
文件:MRCaching.java
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
项目:hadoop
文件:WordCount.java
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
项目:hadoop
文件:WordCount.java
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
项目:hadoop
文件:TestDatamerge.java
public void reduce(IntWritable key, Iterator<IntWritable> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
int seen = 0;
while (values.hasNext()) {
seen += values.next().get();
}
assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
}