private static void checkOuterConsistency(Job job, Path[] src) throws IOException { Path outf = FileOutputFormat.getOutputPath(job); FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, new Utils.OutputFileUtils.OutputFilesFilter()); assertEquals("number of part files is more than 1. It is" + outlist.length, 1, outlist.length); assertTrue("output file with zero length" + outlist[0].getLen(), 0 < outlist[0].getLen()); SequenceFile.Reader r = new SequenceFile.Reader(cluster.getFileSystem(), outlist[0].getPath(), job.getConfiguration()); IntWritable k = new IntWritable(); IntWritable v = new IntWritable(); while (r.next(k, v)) { assertEquals("counts does not match", v.get(), countProduct(k, src, job.getConfiguration())); } r.close(); }
public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<Text, Text> out, Reporter reporter) throws IOException { keyVal = key.get(); while(values.hasNext()) { Text value = values.next(); String towrite = value.toString() + "\n"; indexStream.write(towrite.getBytes(Charsets.UTF_8)); written++; if (written > numIndexes -1) { // every 1000 indexes we report status reporter.setStatus("Creating index for archives"); reporter.progress(); endIndex = keyVal; String masterWrite = startIndex + " " + endIndex + " " + startPos + " " + indexStream.getPos() + " \n" ; outStream.write(masterWrite.getBytes(Charsets.UTF_8)); startPos = indexStream.getPos(); startIndex = endIndex; written = 0; } } }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] keyVal = value.toString().split("\\t"); double[] Ai = new double[Bh]; int i = Integer.parseInt(keyVal[0]) - 1; String[] values = keyVal[1].split(","); for (int j = 0; j < values.length; j++) { Ai[j] = Double.parseDouble(values[j]); } double[] Ci = new double[Bw]; StringBuilder result = new StringBuilder(prefix); for (int j = 0; j < Bw; j++) { Ci[j] = 0d; for (int k = 0; k < Bh; k++) { Ci[j] += Ai[k] * B[k][j]; } result.append(Ci[j]); if (j != Bw - 1) { result.append(","); } } context.write(new IntWritable(i + 1), new Text(result.toString())); }
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int errors = 0; MarkableIterator<IntWritable> mitr = new MarkableIterator<IntWritable>(values.iterator()); switch (key.get()) { case 0: errors += test0(key, mitr); break; case 1: errors += test1(key, mitr); break; case 2: errors += test2(key, mitr); break; case 3: errors += test3(key, mitr); break; default: break; } context.write(key, new IntWritable(errors)); }
@Override /** * 给sql语句中的?赋值的方法 */ public void collect(Configuration conf, BaseDimension key, BaseStatsValueWritable value, PreparedStatement pstmt, IDimensionConverter converter) throws SQLException, IOException { StatsUserDimension statsUserDimension = (StatsUserDimension) key; MapWritableValue mapWritableValue = (MapWritableValue) value; IntWritable newInstallUsers = (IntWritable) mapWritableValue.getValue().get(new IntWritable(-1)); int i = 0; pstmt.setInt(++i, converter.getDimensionIdByValue(statsUserDimension.getStatsCommon().getPlatform())); pstmt.setInt(++i, converter.getDimensionIdByValue(statsUserDimension.getStatsCommon().getDate())); pstmt.setInt(++i, converter.getDimensionIdByValue(statsUserDimension.getBrowser())); pstmt.setInt(++i, newInstallUsers.get()); pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES)); pstmt.setInt(++i, newInstallUsers.get()); pstmt.addBatch(); }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String doc = value.toString(); String text = slice(doc, "<text", "</text>", true); if (text.length() < 1) return; char txt[] = text.toLowerCase().toCharArray(); for (int i = 0; i < txt.length; ++i) { if (!((txt[i] >= 'a' && txt[i] <= 'z') || (txt[i] >= 'A' && txt[i] <= 'Z'))) txt[i] = ' '; } String id = slice(doc, "<id>", "</id>", false); if (id.length() < 1) return; StringTokenizer itr = new StringTokenizer(String.valueOf(txt)); int sum = itr.countTokens(); while (itr.hasMoreTokens()) { String s = itr.nextToken(); word.set(id + '-' + s); IntWritable tmp[] = {new IntWritable(sum), new IntWritable(1)}; IntArrayWritable temp = new IntArrayWritable(tmp); context.write(word, temp); } }
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); line = line.trim().toLowerCase(); line = line.replaceAll("[^a-z]+", " "); String words[] = line.split("\\s+"); //split by ' ', '\t', '\n', etc. if(words.length < 2) { return; } StringBuilder sb; for (int i = 0; i < words.length-1; i++) { sb = new StringBuilder(); for (int j = 0; i + j < words.length && j < noGram; j++) { sb.append(" "); sb.append(words[i + j]); context.write(new Text(sb.toString().trim()), new IntWritable(1)); } } }
@Override protected void reduce(twoDimensionIndexWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { if(key.getMatrixKind().equals(MatrixKind.Corpus)) { context.write(key, values.iterator().next()); return ; } else if(key.getMatrixKind().equals(MatrixKind.DocTopic)|| key.getMatrixKind().equals(MatrixKind.TopicWord)){ int count = 0; for(Text text : values) { count += Integer.parseInt(text.toString()); } if (key.getMatrixKind().equals(MatrixKind.DocTopic)) { writer1.append(new twoDimensionIndexWritable(key.getM(), key.getN()), new IntWritable(count)); } else { writer2.append(new twoDimensionIndexWritable(key.getM(), key.getN()), new IntWritable(count)); } } return; }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("xmlinput.start", "<page>"); conf.set("xmlinput.end", "</page>"); Job job =Job.getInstance(conf); job.setJobName("PageWordCount"); job.setJarByClass(PageWordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(PageWordCountMap.class); job.setCombinerClass(PageWordCountReduce.class); job.setReducerClass(PageWordCountReduce.class); job.setInputFormatClass(XmlInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }
public void testNestedIterable() throws Exception { Random r = new Random(); Writable[] writs = { new BooleanWritable(r.nextBoolean()), new FloatWritable(r.nextFloat()), new FloatWritable(r.nextFloat()), new IntWritable(r.nextInt()), new LongWritable(r.nextLong()), new BytesWritable("dingo".getBytes()), new LongWritable(r.nextLong()), new IntWritable(r.nextInt()), new BytesWritable("yak".getBytes()), new IntWritable(r.nextInt()) }; TupleWritable sTuple = makeTuple(writs); assertTrue("Bad count", writs.length == verifIter(writs, sTuple, 0)); }
public Job createJob() throws IOException { Configuration conf = getConf(); conf.setInt(MRJobConfig.NUM_MAPS, 1); Job job = Job.getInstance(conf, "test"); job.setNumReduceTasks(1); job.setJarByClass(CredentialsTestJob.class); job.setNumReduceTasks(1); job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class); job.setInputFormatClass(SleepJob.SleepInputFormat.class); job.setPartitionerClass(SleepJob.SleepJobPartitioner.class); job.setOutputFormatClass(NullOutputFormat.class); job.setSpeculativeExecution(false); job.setJobName("test job"); FileInputFormat.addInputPath(job, new Path("ignored")); return job; }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setMapperClass(DataDividerMapper.class); job.setReducerClass(DataDividerReducer.class); job.setJarByClass(DataDividerByUser.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); TextInputFormat.setInputPaths(job, new Path(args[0])); TextOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }
@Test public void reduce() { MaxTemperatureMapRed.MaxTemperatureReduce maxTemperatureReduce = new MaxTemperatureMapRed.MaxTemperatureReduce(); try { List<IntWritable> list = new ArrayList<IntWritable>(); list.add(new IntWritable(12)); list.add(new IntWritable(31)); list.add(new IntWritable(45)); list.add(new IntWritable(23)); list.add(new IntWritable(21)); maxTemperatureReduce.reduce(new Text("1901"), list.iterator(), new OutputCollector<Text, IntWritable>() { @Override public void collect(final Text text, final IntWritable intWritable) throws IOException { log.info(text.toString() + " " + intWritable.get()); } }, null); } catch (IOException e) { e.printStackTrace(); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "test"); job.setMapperClass(testMapper.class); job.setPartitionerClass(testPartitioner.class); job.setReducerClass(testReducer.class); job.setNumReduceTasks(10); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); if (!job.waitForCompletion(true)) return; }
/** * Verify IntervalSampler in mapred.lib.InputSampler, which is added back * for binary compatibility of M/R 1.x */ @Test (timeout = 30000) @SuppressWarnings("unchecked") // IntWritable comparator not typesafe public void testMapredIntervalSampler() throws Exception { final int TOT_SPLITS = 16; final int PER_SPLIT_SAMPLE = 4; final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE; final double FREQ = 1.0 / TOT_SPLITS; org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable> sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler <IntWritable,NullWritable>(FREQ, NUM_SAMPLES); int inits[] = new int[TOT_SPLITS]; for (int i = 0; i < TOT_SPLITS; ++i) { inits[i] = i; } Job ignored = Job.getInstance(); Object[] samples = sampler.getSample(new TestInputSamplerIF( NUM_SAMPLES, TOT_SPLITS, inits), ignored); assertEquals(NUM_SAMPLES, samples.length); Arrays.sort(samples, new IntWritable.Comparator()); for (int i = 0; i < NUM_SAMPLES; ++i) { assertEquals(i, ((IntWritable)samples[i]).get()); } }
public void map(IntWritable key, IntWritable val, Context context) throws IOException, InterruptedException { int k = key.get(); final int vali = val.get(); final String kvstr = "Unexpected tuple: " + stringify(key, val); if (0 == k % (srcs * srcs)) { assertTrue(kvstr, vali == k * 10 / srcs + srcs - 1); } else { final int i = k % srcs; assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i)); } context.write(key, one); //If the user modifies the key or any of the values in the tuple, it // should not affect the rest of the join. key.set(-1); val.set(0); }
public static void main(String[] args) throws Exception { if(args.length != 2){ System.err.println("Usage: MaxTemperatureWithCombiner <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperatureWithCombiner.class); job.setJobName("Max Temperature With Combiner"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }
/** * Tests one of the mappers throwing exception. * * @throws Exception */ public void testChainFail() throws Exception { Configuration conf = createJobConf(); Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0, input); job.setJobName("chain"); ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, null); ChainMapper.addMapper(job, FailMap.class, LongWritable.class, Text.class, IntWritable.class, Text.class, null); ChainMapper.addMapper(job, Mapper.class, IntWritable.class, Text.class, LongWritable.class, Text.class, null); job.waitForCompletion(true); assertTrue("Job Not failed", !job.isSuccessful()); }
public void readFields(DataInput dataInput) throws IOException { Text text = new Text(); text.readFields(dataInput); Logger.println("value wrapper read class:"+text.toString()); String className = text.toString(); if (className.equals(IntWritable.class.getSimpleName())) { value = new IntWritable(); } else if (className.equals(NewOldCustomElement.class.getSimpleName())) { value = new NewOldCustomElement(); } else if (className.equals(CustomerFlowElement.class.getSimpleName())) { value = new CustomerFlowElement(); } else { throw new IOException("can not read fields "+className); } value.readFields(dataInput); }
private void writeInStoreHour() throws IOException, InterruptedException{ KeyWrapper cycleKey = new KeyWrapper(); cycleKey.setType(new Text(MapKeyConfig.IN_STORE_HOUR)); LongWritable longWritable = new LongWritable(); cycleKey.setMillisTime(longWritable); IntWritable value = new IntWritable(1); List<Long> inStoreHours = statistic.getInStoreHours(); for (Long inStoreTime : inStoreHours) { longWritable.set(IntervalCalculator.getInStoreInterval(inStoreTime)); context.write(cycleKey, new ValueWrapper(value)); } }
/** * Sets up the actual job. * * @param conf The current configuration. * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { String tableName = args[0]; Path outputDir = new Path(args[1]); String reportSeparatorString = (args.length > 2) ? args[2]: ":"; conf.set("ReportSeparator", reportSeparatorString); Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(CellCounter.class); Scan scan = getConfiguredScanForJob(conf, args); TableMapReduceUtil.initTableMapperJob(tableName, scan, CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); job.setNumReduceTasks(1); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileOutputFormat.setOutputPath(job, outputDir); job.setReducerClass(IntSumReducer.class); return job; }
@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // get a collection of "clean words" from a complete sentence String line = value.toString(); line = line.trim().toLowerCase(); line = line.replaceAll("[^a-z]", " "); String[] words = line.split("\\s+"); // generate n-gram entries from this sentence StringBuilder sb; if (words.length < 2) { return; } for (int i = 0; i < (words.length - 1); i++) { sb = new StringBuilder(); sb.append(words[i]); for (int j = 1; (i + j) < words.length && j < GRAM_NUMBER; j++) { sb.append(" "); sb.append(words[i + j]); context.write(new Text(sb.toString().trim()), new IntWritable(1)); } } }
/** {@inheritDoc} */ @Override public void init(Job job) { // setup mapper job.setMapperClass(PartitionMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(SummationWritable.class); // setup partitioner job.setPartitionerClass(IndexPartitioner.class); // setup reducer job.setReducerClass(SummingReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(TaskResult.class); final Configuration conf = job.getConfiguration(); final int nParts = conf.getInt(N_PARTS, 1); job.setNumReduceTasks(nParts); // setup input job.setInputFormatClass(SummationInputFormat.class); }
public void reduce(BytesWritable key, Iterator<IntWritable> values, OutputCollector<BytesWritable, IntWritable> output, Reporter reporter) throws IOException { int ones = 0; int twos = 0; while (values.hasNext()) { IntWritable count = values.next(); if (count.equals(sortInput)) { ++ones; } else if (count.equals(sortOutput)) { ++twos; } else { throw new IOException("Invalid 'value' of " + count.get() + " for (key,value): " + key.toString()); } } // Check to ensure there are equal no. of ones and twos if (ones != twos) { throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos + ") for (key, value): " + key.toString()); } }
private Job jobListFriends(String inputPath, String outputPath) throws IOException, InterruptedException, ClassNotFoundException{ Job job = new Job(); job.setJarByClass(WordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(KeyValueTextInputFormat.class); // Need to change the import job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); return job; }
/** * test PipesPartitioner * test set and get data from PipesPartitioner */ @Test public void testPipesPartitioner() { PipesPartitioner<IntWritable, Text> partitioner = new PipesPartitioner<IntWritable, Text>(); JobConf configuration = new JobConf(); Submitter.getJavaPartitioner(configuration); partitioner.configure(new JobConf()); IntWritable iw = new IntWritable(4); // the cache empty assertEquals(0, partitioner.getPartition(iw, new Text("test"), 2)); // set data into cache PipesPartitioner.setNextPartition(3); // get data from cache assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2)); }
@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //input user,movie,rating String[] user_movie_rating = value.toString().trim().split(","); int userID = Integer.parseInt(user_movie_rating[0]); String movieID = user_movie_rating[1]; String rating = user_movie_rating[2]; context.write(new IntWritable(userID), new Text(movieID +":" + rating)); }
@Override public void reduce(Text key, Iterable<IntWritable> pageCnts, Context context ) throws IOException, InterruptedException { for (IntWritable cnt : pageCnts) { sum += cnt.get(); } count += 1; average = sum / count; finalAvg.set(average); context.write(new Text("Average Page Count = "), finalAvg); }
public static void main(String[] args) throws Exception { Configuration con = new Configuration(); Job bookJob = Job.getInstance(con, "Average Page Count"); bookJob.setJarByClass(AveragePageCount.class); bookJob.setMapperClass(TextMapper.class); bookJob.setReducerClass(AverageReduce.class); bookJob.setOutputKeyClass(Text.class); bookJob.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt")); FileOutputFormat.setOutputPath(bookJob, new Path("C:/Hadoop/BookOutput")); if (bookJob.waitForCompletion(true)) { System.exit(0); } }
static void configureWordCount(FileSystem fs, JobConf conf, String input, int numMaps, int numReduces, Path inDir, Path outDir) throws IOException { fs.delete(outDir, true); if (!fs.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } DataOutputStream file = fs.create(new Path(inDir, "part-0")); file.writeBytes(input); file.close(); FileSystem.setDefaultUri(conf, fs.getUri()); conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.YARN_FRAMEWORK_NAME); conf.setJobName("wordcount"); conf.setInputFormat(TextInputFormat.class); // the keys are words (strings) conf.setOutputKeyClass(Text.class); // the values are counts (ints) conf.setOutputValueClass(IntWritable.class); conf.set("mapred.mapper.class", "testjar.ClassWordCount$MapClass"); conf.set("mapred.combine.class", "testjar.ClassWordCount$Reduce"); conf.set("mapred.reducer.class", "testjar.ClassWordCount$Reduce"); FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); conf.setNumMapTasks(numMaps); conf.setNumReduceTasks(numReduces); //set the tests jar file conf.setJarByClass(TestMiniMRClasspath.class); }
@Override protected void cleanup(Context context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(indexFilePath.toUri(),context.getConfiguration()); SequenceFile.Writer writer = new SequenceFile.Writer(fs, context.getConfiguration(), indexFilePath, Text.class, IntWritable.class); for(String word : wordToIndex.keySet()) { writer.append(new Text(word), new IntWritable(wordToIndex.get(word))); } writer.append(new Text(), new IntWritable(m)); writer.append(new Text(), new IntWritable(wordToIndex.size())); writer.close(); super.cleanup(context); }
public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<IntWritable, Text> out, Reporter reporter) throws IOException { // check key order int currentKey = key.get(); if (currentKey > lastKey) { fail("Keys not in sorted descending order"); } lastKey = currentKey; // check order of values IntWritable previous = new IntWritable(Integer.MAX_VALUE); int valueCount = 0; while (values.hasNext()) { IntWritable current = values.next(); // Check that the values are sorted if (current.compareTo(previous) > 0) fail("Values generated by Mapper not in order"); previous = current; ++valueCount; } if (valueCount != 5) { fail("Values not grouped by primary key"); } out.collect(key, new Text("success")); }
public int[] getValues() { IntWritable[] writables = get(); int[] values = new int[writables.length]; for (int i = 0; i < writables.length; i++) { values[i] = writables[i].get(); } return values; }
@Override public void collect(Configuration conf, BaseDimension key, BaseStatsValueWritable value, PreparedStatement pstmt, IDimensionConverter converter) throws SQLException, IOException { StatsUserDimension statsUserDimension = (StatsUserDimension) key; MapWritableValue mapWritableValue = (MapWritableValue) value; IntWritable newInstallUsers = (IntWritable) mapWritableValue.getValue().get(new IntWritable(-1)); int i = 0; pstmt.setInt(++i, converter.getDimensionIdByValue(statsUserDimension.getStatsCommon().getPlatform())); pstmt.setInt(++i, converter.getDimensionIdByValue(statsUserDimension.getStatsCommon().getDate())); pstmt.setInt(++i, newInstallUsers.get()); pstmt.setString(++i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES)); pstmt.setInt(++i, newInstallUsers.get()); pstmt.addBatch(); }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String doc[] = value.toString().split(String.valueOf('\t')); doc = doc[0].split(String.valueOf('-')); word.set(doc[1]); context.write(word, new IntWritable(1)); if (!M_id.contains(doc[0])) { M_id.add(doc[0]); word.set(String.valueOf("0pages")); context.write(word, new IntWritable(1)); } }
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); }
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int seen = 0; for (IntWritable value : values) { seen += value.get(); } assertTrue("Bad count for " + key.get(), verify(key.get(), seen)); context.write(key, new IntWritable(seen)); }
public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<IntWritable, Text> out, Reporter reporter) throws IOException { // check key order int currentKey = key.get(); if (currentKey < lastKey) { fail("Keys not in sorted ascending order"); } lastKey = currentKey; // check order of values IntWritable previous = new IntWritable(Integer.MIN_VALUE); int valueCount = 0; while (values.hasNext()) { IntWritable current = values.next(); // Check that the values are sorted if (current.compareTo(previous) < 0) fail("Values generated by Mapper not in order"); previous = current; ++valueCount; } if (valueCount != 5) { fail("Values not grouped by primary key"); } out.collect(key, new Text("success")); }
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }