Java 类org.apache.hadoop.mapreduce.Reducer.Context 实例源码
项目:SOAPgaea
文件:VCFRecalibrator.java
/**
* recal one vcf file
* @param id
* @param values
* @throws InterruptedException
* @throws IOException
*/
public void recalVCF(int id, Context context) throws IOException, InterruptedException{
long start,end;
start = System.currentTimeMillis();
recalTable.getRecalibrationTable();
end = System.currentTimeMillis();
System.err.println("recal table time:"+(end-start)/1000+"s");
start = System.currentTimeMillis();
recalTable.indexData();
end = System.currentTimeMillis();
System.err.println("recal table index time:"+(end-start)/1000+"s");
for (final Tranche t : recalTable.getTranches()) {
if (t.ts >= options.getTSFilterLevel()) {
tranches.add(t);
}
}
// this algorithm wants the tranches ordered from best (lowest truth sensitivity) to worst (highest truth sensitivity)
Collections.reverse(tranches);
}
项目:big-c
文件:HistogramRatings.java
public void map(Object key, Text value,
Context context) throws IOException, InterruptedException{
int rating, reviewIndex, movieIndex;
String reviews = new String();
String tok = new String();
String ratingStr = new String();
String line = ((Text)value).toString();
movieIndex = line.indexOf(":");
if (movieIndex > 0) {
reviews = line.substring(movieIndex + 1);
StringTokenizer token = new StringTokenizer(reviews, ",");
while (token.hasMoreTokens()) {
tok = token.nextToken();
reviewIndex = tok.indexOf("_");
ratingStr = tok.substring(reviewIndex + 1);
rating = Integer.parseInt(ratingStr);
context.write(new IntWritable(rating), one);
}
}
}
项目:newsRecommender
文件:TFIDF2.java
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
// 同一个单词会被分成同一个group
float sum = 0;
List<String> vals = new ArrayList<String>();
for (Text str : values) {
int index = str.toString().lastIndexOf(" ");
sum += Integer.parseInt(str.toString().substring(index + 1)); // 统计此单词在所有文件中出现的次数
vals.add(str.toString().substring(0, index)); // 保存
}
double tmp = Math.log10(totalArticle * 1.0 / (sum * 1.0)); // 单词在所有文件中出现的次数除以总文件数IDF
for (int j = 0; j < vals.size(); j++) {
String val = vals.get(j);
String newsID=val.substring(0,val.indexOf(" "));
String end = val.substring(val.lastIndexOf(" "));
float f_end = Float.parseFloat(end); // 读取TF
val += " ";
val += f_end * tmp; // tf-idf值
// context.write(key, new Text(val));
context.write(new Text(newsID), new Text(key+" "+val.substring(val.indexOf(" ")+1)));
}
}
项目:ldbc_snb_datagen
文件:PersonActivityGenerator.java
public void generateActivityForBlock(int seed, ArrayList<Person> block, Context context) throws IOException {
randomFarm_.resetRandomGenerators(seed);
forumId = 0;
messageId = 0;
SN.machineId = seed;
personActivitySerializer_.reset();
int counter = 0;
float personGenerationTime = 0.0f;
for (Person p : block) {
long start = System.currentTimeMillis();
generateActivity(p, block);
if (DatagenParams.updateStreams) {
updateSerializer_.changePartition();
}
if (counter % 1000 == 0) {
context.setStatus("Generating activity of person " + counter + " of block" + seed);
context.progress();
}
float time = (System.currentTimeMillis() - start) / 1000.0f;
personGenerationTime += time;
counter++;
}
System.out.println("Average person activity generation time " + personGenerationTime / (float) block.size());
}
项目:bigdata-fingerprint
文件:Util.java
@SuppressWarnings("rawtypes")
public static LocalStructure [] readDistributedCacheFingerprint(Context context, String fpid, boolean discarding) throws IOException {
URI[] input_files = context.getCacheFiles();
@SuppressWarnings("unchecked")
Class<? extends LocalStructure> MatcherClass = (Class<? extends LocalStructure>) Util.getClassFromProperty(context, "matcher");
// Compute the localstructures of the input fingerprint
// and store so that all maps and reduces can access.
for(URI input_file : input_files) {
// String[] lines = Util.readFileByLines(FilenameUtils.getName(input_file.getPath()));
String[] lines = Util.readFileByLines(input_file.getPath());
for(String line : lines) {
if(LocalStructure.decodeFpid(line).equals(fpid))
return LocalStructure.extractLocalStructures(MatcherClass, line);
}
}
System.err.println("readDistributedCacheFingerprint: input fingerprint " + fpid + " not found");
return null;
}
项目:htools
文件:HashPartitioner.java
public static String getOutfile(Context context) {
try {
if (HashPartitioner.class.isAssignableFrom(context.getPartitionerClass())) {
String outpath = context.getConfiguration().get(HPCONFIG);
if (outpath != null) {
int partition = Job.getReducerId(context);
return PrintTools.sprintf("%s/partition.%5d", outpath, partition);
} else {
log.info("must use setOutPath on HashPartitioner");
}
}
} catch (ClassNotFoundException ex) {
log.exception(ex, "HashPartitioner");
}
throw new RuntimeException("fatal");
}
项目:SentimentAnalysis
文件:WriteIndicesSetDriver.java
@Override
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
Text resText = null;
String prim_key = new String(row.get());
for(KeyValue kv : value.raw()) {
try{
double norm = (double)Integer.parseInt(new String(kv.getValue())) / prim_key.length();
//double norm = (double)Integer.parseInt(new String(kv.getValue()));
//double norm = (double)Integer.parseInt(new String(kv.getValue())) / kv.getQualifier().toString().length();
resText = new Text(prim_key + "," + String.valueOf(norm));
String qual = new String (kv.getQualifier());
context.write(new Text(qual), resText);
//System.out.println("WriteIndicesMapper: w_i = " + prim_key + " w_c = " + qual + " <w_c>, <w_i, norm_ic> = " + resText);
}
catch(Exception e) {
System.out.println("Exception in mapper for key = " + prim_key);
}
}
}
项目:recsys-offline
文件:Step32.java
public void map(VarLongWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{
long userID=key.get();
Vector userVector=value.get();
Iterator<Vector.Element> it=userVector.nonZeroes().iterator();
IntWritable itemi=new IntWritable();
while(it.hasNext()){
Vector.Element e=it.next();
int itemIndex=e.index();
float preferenceValue=(float)e.get();
itemi.set(itemIndex);
context.write(itemi, new VectorOrPrefWritable(userID,preferenceValue));
System.out.println("item :"+itemi+",userand val:"+userID+","+preferenceValue);
}
}
项目:chinesesegmentor
文件:ParallelTraining.java
@Override
protected void reduce(NullWritable key, Iterable<TrainingWeights> values, Context context)
throws IOException, InterruptedException {
TrainingWeights result = null;
int total = 0;
for (TrainingWeights weights : values) {
if (result == null) {
result = weights;
} else {
addWeights(result, weights);
}
total++;
}
if (total > 1) {
divideWeights(result, total);
}
context.write(NullWritable.get(), result);
}
项目:chinesesegmentor
文件:CalcFeatureWeights.java
@Override
protected void reduce(IntWritable key, Iterable<MyKey> values, Context context)
throws IOException, InterruptedException {
double w = 0;
int total = 0;
double[] array = new double[6];
for (MyKey value : values) {
total++;
w += value.score * value.score;
array[value.id] = value.score;
}
if (total != 6) {
throw new IOException("not 6 for: " + key.get());
}
MyKey k = new MyKey(key.get(), w);
MyValue v = new MyValue(array);
context.write(k, v);
}
项目:mutual-information-words
文件:wordcountReduce.java
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Set docs = new HashSet();
Text output = new Text();
StringBuffer result = new StringBuffer();
for (Text val : values) {
docs.add(val.toString());
}
result.append(docs.size()+"#");
Iterator setIter = docs.iterator();
while(setIter.hasNext())
{
Object setValue = setIter.next();
result.append(setValue.toString() + "#");
}
output.set(result.toString().substring(0, result.length() - 1));
context.write(key, output);
}
项目:SOAPgaea
文件:BasicReport.java
public boolean constructMapReport(SamRecordDatum datum, ReferenceShare genome, String chrName, Context context) {
ChromosomeInformationShare chrInfo = genome.getChromosomeInfo(chrName);
rTracker.setTrackerAttribute(ReadType.TOTALREADS);
// 当位点坐标值+read长度大于染色体的长度时,则不处理该read,进入下一次循环
if(datum.getEnd() >= chrInfo.getLength()) {
context.getCounter("Exception", "read end pos more than chr end pos").increment(1);
return false;
}
rTracker.setTrackerAttribute(ReadType.MAPPED);
bTracker.setTrackerAttribute(BaseType.TOTALBASE.setCount(datum.getBaseCount()));
if(datum.isUniqueAlignment()) {
rTracker.setTrackerAttribute(ReadType.UNIQUE);
}
if ((datum.getFlag() & 0x400) != 0) {
rTracker.setTrackerAttribute(ReadType.DUP);
}
if ((datum.getFlag() & 0x40) != 0 && (datum.getFlag() & 0x8) == 0) {
rTracker.setTrackerAttribute(ReadType.PE);
}
String cigar = datum.getCigarString();
if (cigar.contains("S") || cigar.contains("H")) {
rTracker.setTrackerAttribute(ReadType.CLIPPED);
}
if (cigar.contains("D") || cigar.contains("I")) {
rTracker.setTrackerAttribute(ReadType.INDEL);
}
if (isMismatch(datum, chrInfo)) {
rTracker.setTrackerAttribute(ReadType.MISMATCHREADS);
}
return true;
}
项目:SOAPgaea
文件:RecalibratorContextWriter.java
@SuppressWarnings({ "rawtypes", "unchecked" })
public RecalibratorContextWriter(Context ctx,boolean multiple) {
if(multiple)
mos = new MultipleOutputs<NullWritable, Text>(ctx);
this.context = ctx;
value = new SamRecordWritable();
}
项目:Apriori_Hadoop
文件:Reduce.java
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int countItemFreq = 0;
for (IntWritable value : values){
countItemFreq += value.get();
}
int minsup = Integer.parseInt(context.getConfiguration().get("minsup"));
if (countItemFreq >= minsup)
{
context.write(key, new IntWritable(countItemFreq));
}
}
项目:big-c
文件:MutiWordcount.java
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
项目:big-c
文件:MutiWordcount.java
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
项目:big-c
文件:HistogramRatings.java
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
项目:HadoopKMeansClustering
文件:KReducer.java
@Override
public void setup(Context context) throws IOException{
//get job configuration
Configuration conf = context.getConfiguration();
columns = Arrays.stream(conf.getStrings("columns"))
.map( s -> Integer.parseInt(s)).toArray(Integer[]::new);
k = (int) conf.getInt("k", 10);
currentIteration = conf.getInt("currentIteration", 0);
lastIteration = conf.getBoolean("lastIteration", false);
mos = new MultipleOutputs(context);
}
项目:newsRecommender
文件:TFIDF2.java
public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
String[] lineSplits = value.toString().split("\t");
String newsID = lineSplits[1];
String content = lineSplits[4];
String publishTime=lineSplits[5];
Calendar cal1 = Calendar.getInstance();
try {
cal1.setTime(new SimpleDateFormat("yyyy年MM月dd日HH:mm").parse(publishTime));
publishTime=Long.toString(cal1.getTimeInMillis());
} catch (Exception e) {
publishTime="0";
}
context.write(new Text(newsID+"|"+publishTime+"|"+content),new Text(""));
}
项目:newsRecommender
文件:TFIDF2.java
public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
int all = 0; // 单词总数统计
String[] lineSplits = value.toString().split("\\|");
newsID = lineSplits[0];
publishTime = lineSplits[1];
content = lineSplits[2];
Analyzer analyzer = new IKAnalyzer(false);
TokenStream ts = analyzer.tokenStream("", new StringReader(content));
ts.reset();
CharTermAttribute cta = ts.getAttribute(CharTermAttribute.class);
Map<String, Long> splitWordMap = new HashMap<String, Long>();
while (ts.incrementToken()) {
word = cta.toString();
word += " ";
word += newsID;
all++;
if (splitWordMap.containsKey(word))
splitWordMap.put(word, splitWordMap.get(word) + 1);
else
splitWordMap.put(word, 1L);
}
Iterator iter = splitWordMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Long> entry = (Map.Entry<String, Long>) iter.next();
String key1 = entry.getKey();
Long val = entry.getValue();
// 下面的key值要加上一个单词的总字数 ,在生成每篇文章的词频矩阵时会用到。
context.write(new Text(key1+"|"+all+"|"+publishTime), new Text((Float.parseFloat(val.toString()) / all)
+ ""));
}
}
项目:newsRecommender
文件:TFIDF2.java
public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
String val = value.toString().replaceAll(" ", " "); // 将vlaue中的TAB分割符换成空格
int index = val.indexOf(" ");
String s1 = val.substring(0, index); // 获取单词 作为key es: hello
String s2 = val.substring(index + 1); // 其余部分 作为value es: test1 0.11764706
s2 += " ";
s2 += "1"; // 统计单词在所有文章中出现的次数, “1” 表示出现一次。 es: test1 0.11764706 1
context.write(new Text(s1), new Text(s2));
}
项目:newsRecommender
文件:TFIDF2.java
public void map(LongWritable key, Text value, Context cxt) throws IOException,
InterruptedException {
String[] values = value.toString().split("\t");
String[] values2=values[1].split(" ");
newKey.setSymbol(values[0]);
newKey.setSymbol2(values2[0]);
newKey.setValue(Double.parseDouble(values2[1]));
newKey.setValue2(Double.parseDouble(values2[2]));
cxt.write(newKey, NullWritable.get());
}
项目:newsRecommender
文件:TFIDF2.java
public void reduce(CustomKey key, Iterable<NullWritable> values, Context cxt)
throws IOException, InterruptedException {
int limit=0;
for (NullWritable v : values) {
if(++limit<=50)
cxt.write(key, v);
}
}
项目:search
文件:RejectingUpdateConflictResolver.java
@Override
public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {
SolrInputDocument firstUpdate = null;
while (updates.hasNext()) {
if (firstUpdate == null) {
firstUpdate = updates.next();
assert firstUpdate != null;
} else {
throw new IllegalArgumentException("Update conflict! Documents with the same unique key are forbidden: "
+ key);
}
}
assert firstUpdate != null;
return Collections.singletonList(firstUpdate).iterator();
}
项目:search
文件:RetainMostRecentUpdateConflictResolver.java
/** Returns the most recent document among the colliding updates */
protected Iterator<SolrInputDocument> getMaximum(Iterator<SolrInputDocument> updates, String fieldName,
Comparator child, Context context) {
SolrInputDocumentComparator comp = new SolrInputDocumentComparator(fieldName, child);
SolrInputDocument max = null;
long numDupes = 0;
long numOutdated = 0;
while (updates.hasNext()) {
SolrInputDocument next = updates.next();
assert next != null;
if (max == null) {
max = next;
} else {
int c = comp.compare(next, max);
if (c == 0) {
LOG.debug("Ignoring document version because it is a duplicate: {}", next);
numDupes++;
} else if (c > 0) {
LOG.debug("Ignoring document version because it is outdated: {}", max);
max = next;
numOutdated++;
} else {
LOG.debug("Ignoring document version because it is outdated: {}", next);
numOutdated++;
}
}
}
assert max != null;
if (numDupes > 0) {
context.getCounter(COUNTER_GROUP, DUPLICATES_COUNTER_NAME).increment(numDupes);
}
if (numOutdated > 0) {
context.getCounter(COUNTER_GROUP, OUTDATED_COUNTER_NAME).increment(numOutdated);
}
return Collections.singletonList(max).iterator();
}
项目:FlexMap
文件:MutiWordcount.java
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
项目:FlexMap
文件:MutiWordcount.java
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
项目:htools
文件:DayPartitioner.java
/**
* @param context Reducer context
* @return a YYYY-MM-DD datestring based on the configured startdate, enddates
* and the reducer number.
*/
public static String getDate(Context context) {
Configuration conf = context.getConfiguration();
int reducer = ContextTools.getTaskID(context);
long start = conf.getLong(starttimelabel, 0) + reducer * secperday;
return DateTools.FORMAT.Y_M_D.formatEpoch(start);
}
项目:iis
文件:ImportInformationSpaceReducerTest.java
@Before
public void init() throws Exception {
reducer = new ImportInformationSpaceReducer() {
@Override
protected MultipleOutputs instantiateMultipleOutputs(Context context) {
return multipleOutputs;
}
};
}
项目:Ankus
文件:MahoutDotProductDistributedCache.java
@Override
public void map(IntWritable r, VectorWritable v, Context context) throws IOException {
try {
for (Entry<String, Vector> w : classWeights.entrySet()) {
context.write(new Text(String.valueOf(r.get())+"_"+w.getKey()), new DoubleWritable(v.get().dot(w.getValue())));
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
项目:Ankus
文件:SubtractColumnMeans.java
@Override
public void map(LongWritable r, VectorWritable v, Context context) throws IOException {
try {
Vector newV = v.get().minus(columnMeans);
context.write(new IntWritable((int)r.get()), new VectorWritable(newV));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
项目:SentimentAnalysis
文件:CalculateBetaDriver.java
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String [] words = line.split("\t");
String [] sentence = words[1].split("=");
String [] scores = sentence[1].split(";");
String posScore = scores[0];
String negScore = scores[1];
context.write(new Text(words[0]), new Text(posScore + ";" + negScore));
}
项目:SentimentAnalysis
文件:CalculateBetaDriver.java
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sumPos = 0;
double sumNeg = 0;
for(Text word: values) {
String [] scores = word.toString().split(";");
sumPos += Double.valueOf(scores[0]);
sumNeg += Double.valueOf(scores[1]);
}
context.write(key, new Text(sumPos + ";" + sumNeg));
}
项目:SentimentAnalysis
文件:FinalScoreCalculation.java
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String [] words = line.split("\t");
context.write(new Text(words[0]), new Text(words[1]));
}
项目:SentimentAnalysis
文件:FinalScoreCalculation.java
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text text : values) {
Put put = new Put(key.getBytes());
String word = text.toString();
String [] sentence = word.split("=");
String [] scores = sentence[1].split(";");
String posScore = scores[0];
String negScore = scores[1];
double beta = GeneralDriver.getBeta();
double finScore = Double.valueOf(posScore) - beta * Double.valueOf(negScore);
put.add(Bytes.toBytes("score"), Bytes.toBytes(sentence[0]), Bytes.toBytes(finScore));
context.write(null, put);
}
}
项目:SentimentAnalysis
文件:WriteIndicesSetDriver.java
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for(Text str : values) {
sb.append(str.toString() + "///");
}
context.write(key, new Text(sb.toString()));
}
项目:recsys-offline
文件:Step32.java
public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{
for(VectorOrPrefWritable va:values){
context.write(key, va);
System.err.println("key"+key.toString()+",vlaue"+va);
}
}
项目:SEARUM
文件:RuleMiningReducer.java
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Parameters params = new Parameters(context.getConfiguration().get(
"minConfidence", ""));
/* Get Item Frequent List from DC */
for (Pair<String, Long> e : ARM.readFList(context.getConfiguration())) {
freqItemMap.put(e.getFirst(), e.getSecond());
}
minConfidence = Double.valueOf(params.get("minConfidence", "0.1"));
}
项目:chinesesegmentor
文件:ParallelTraining2.java
@Override
public void map(Object key, Text value, final Context context) throws IOException,
InterruptedException {
String s = value.toString().split("\t", 2)[1];
Instance instance = gson.fromJson(s, Instance.class);
instances.add(instance);
}
项目:chinesesegmentor
文件:ParallelTraining2.java
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0;
int num = 0;
for (DoubleWritable weights : values) {
sum += weights.get();
num++;
}
context.getCounter("TrainingReducer", "num" + num).increment(1);
context.write(key, new DoubleWritable(sum / num));
}