@Override public void reduce(Text analyzerKey, Iterable<SortedMapWritable> rows, Context context) throws IOException, InterruptedException { Analyzer<?> analyzer = ConfigurationSerializer.initializeAnalyzer(analyzerKey.toString(), analyzerBeansConfiguration, analysisJob); logger.info("analyzerKey = " + analyzerKey.toString() + " rows: "); for (SortedMapWritable rowWritable : rows) { InputRow inputRow = RowUtils.sortedMapWritableToInputRow(rowWritable, analysisJob.getSourceColumns()); analyzer.run(inputRow, 1); RowUtils.printSortedMapWritable(rowWritable, logger); Text finalText = CsvParser.toCsvText(rowWritable); context.write(NullWritable.get(), finalText); } logger.info("end of analyzerKey = " + analyzerKey.toString() + " rows."); AnalyzerResult analyzerResult = analyzer.getResult(); logger.debug("analyzerResult.toString(): " + analyzerResult.toString()); }
public static Text toCsvText(SortedMapWritable row) { Text finalText = new Text(); for (@SuppressWarnings("rawtypes") Iterator<Entry<WritableComparable, Writable>> iterator = row.entrySet() .iterator(); iterator.hasNext();) { @SuppressWarnings("rawtypes") Entry<WritableComparable, Writable> next = iterator.next(); if (next.getValue() instanceof Text) { Text value = ((Text) next.getValue()); finalText.set(finalText.toString() + value.toString()); } // else do not append anything - the value is null, so empty. if (iterator.hasNext()) finalText.set(finalText.toString() + ";"); else finalText.set(finalText.toString()); } return finalText; }
@Test public void testMapper() throws IOException { SortedMapWritable expectedPoland = new SortedMapWritable(); expectedPoland.put(new Text("Country name"), new Text("Poland")); expectedPoland.put(new Text("ISO 3166-2"), new Text("PL")); expectedPoland.put(new Text("ISO 3166-3"), new Text("POL")); expectedPoland.put(new Text("ISO Numeric"), new Text("616")); mapDriver .withInput( new LongWritable(0), new Text( "Country name;ISO 3166-2;ISO 3166-3;ISO Numeric;Linked to country;Synonym1;Synonym2;Synonym3")) .withInput(new LongWritable(44), new Text("Poland;PL;POL;616;")); List<Pair<Text, SortedMapWritable>> actualOutputs = mapDriver.run(); Assert.assertEquals(2, actualOutputs.size()); Pair<Text, SortedMapWritable> actualOutputPoland = actualOutputs.get(0); actualOutputPoland.getSecond().containsValue("Poland"); }
@Test public void testReducerHeader() throws IOException { List<SortedMapWritable> rows = new ArrayList<SortedMapWritable>(); SortedMapWritable header = new SortedMapWritable(); header.put(new Text("ISO 3166-2_ISO 3166-3"), new Text("ISO 3166-2_ISO 3166-3")); header.put(new Text("Country name"), new Text("Country name")); header.put(new Text("ISO 3166-2"), new Text("ISO 3166-2")); header.put(new Text("ISO 3166-3"), new Text("ISO 3166-3")); header.put(new Text("ISO Numeric"), new Text("ISO Numeric")); header.put(new Text("Linked to country"), new Text("Linked to country")); header.put(new Text("Synonym1"), new Text("Synonym1")); header.put(new Text("Synonym2"), new Text("Synonym2")); header.put(new Text("Synonym3"), new Text("Synonym3")); rows.add(header); reduceDriver.withInput(new Text("Value distribution (Country name)"), rows); reduceDriver .withOutput( NullWritable.get(), new Text( "Country name;ISO 3166-2;ISO 3166-2_ISO 3166-3;ISO 3166-3;ISO Numeric;Linked to country;Synonym1;Synonym2;Synonym3")); reduceDriver.runTest(); }
public void reduce(Text analyzerKey, Iterable<SortedMapWritable> writableResults, Context context) throws IOException, InterruptedException { Analyzer<?> analyzer = ConfigurationSerializer.initializeAnalyzer(analyzerKey.toString(), analyzerBeansConfiguration, analysisJob); logger.info("analyzerKey = " + analyzerKey.toString() + " rows: "); for (SortedMapWritable rowWritable : writableResults) { InputRow inputRow = RowUtils.sortedMapWritableToInputRow(rowWritable, analysisJob.getSourceColumns()); analyzer.run(inputRow, 1); Result result = ResultUtils.sortedMapWritableToResult(rowWritable); ResultUtils.printResult(result, logger); Put put = ResultUtils.preparePut(result); context.write(NullWritable.get(), put); } logger.info("end of analyzerKey = " + analyzerKey.toString() + " rows."); AnalyzerResult analyzerResult = analyzer.getResult(); logger.debug("analyzerResult.toString(): " + analyzerResult.toString()); }
public static Result sortedMapWritableToResult(SortedMapWritable row) { List<Cell> cells = new ArrayList<Cell>(); for (@SuppressWarnings("rawtypes") Map.Entry<WritableComparable, Writable> rowEntry : row.entrySet()) { Text columnFamilyAndName = (Text) rowEntry.getKey(); Text columnValue = (Text) rowEntry.getValue(); String[] split = columnFamilyAndName.toString().split(":"); String columnFamily = split[0]; String columnName = split[1]; Cell cell = new KeyValue(Bytes.toBytes(columnValue.toString()), Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(columnValue.toString())); cells.add(cell); } return Result.create(cells); }
protected void reduce(IntWritable key, Iterable<SortedMapWritable> values, Context context) throws IOException, InterruptedException { SortedMapWritable outValue = new SortedMapWritable(); for (SortedMapWritable v : values) { for (@SuppressWarnings("rawtypes") Entry<WritableComparable, Writable> entry : v.entrySet()) { LongWritable count = (LongWritable) outValue.get(entry.getKey()); if (count != null) { count.set(count.get() + ((LongWritable) entry.getValue()).get()); } else { outValue.put(entry.getKey(), new LongWritable(((LongWritable) entry.getValue()).get())); } } } context.write(key, outValue); }
public void write(Writable w) throws IOException { if (w instanceof TypedBytesWritable) { writeTypedBytes((TypedBytesWritable) w); } else if (w instanceof BytesWritable) { writeBytes((BytesWritable) w); } else if (w instanceof ByteWritable) { writeByte((ByteWritable) w); } else if (w instanceof BooleanWritable) { writeBoolean((BooleanWritable) w); } else if (w instanceof IntWritable) { writeInt((IntWritable) w); } else if (w instanceof VIntWritable) { writeVInt((VIntWritable) w); } else if (w instanceof LongWritable) { writeLong((LongWritable) w); } else if (w instanceof VLongWritable) { writeVLong((VLongWritable) w); } else if (w instanceof FloatWritable) { writeFloat((FloatWritable) w); } else if (w instanceof DoubleWritable) { writeDouble((DoubleWritable) w); } else if (w instanceof Text) { writeText((Text) w); } else if (w instanceof ArrayWritable) { writeArray((ArrayWritable) w); } else if (w instanceof MapWritable) { writeMap((MapWritable) w); } else if (w instanceof SortedMapWritable) { writeSortedMap((SortedMapWritable) w); } else if (w instanceof Record) { writeRecord((Record) w); } else { writeWritable(w); // last resort } }
public void writeSortedMap(SortedMapWritable smw) throws IOException { out.writeMapHeader(smw.size()); for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) { write(entry.getKey()); write(entry.getValue()); } }
public void write(Writable w) throws IOException { if (w instanceof TypedBytesWritable) { writeTypedBytes((TypedBytesWritable) w); } else if (w instanceof BytesWritable) { writeBytes((BytesWritable) w); } else if (w instanceof ByteWritable) { writeByte((ByteWritable) w); } else if (w instanceof BooleanWritable) { writeBoolean((BooleanWritable) w); } else if (w instanceof IntWritable) { writeInt((IntWritable) w); } else if (w instanceof VIntWritable) { writeVInt((VIntWritable) w); } else if (w instanceof LongWritable) { writeLong((LongWritable) w); } else if (w instanceof VLongWritable) { writeVLong((VLongWritable) w); } else if (w instanceof FloatWritable) { writeFloat((FloatWritable) w); } else if (w instanceof DoubleWritable) { writeDouble((DoubleWritable) w); } else if (w instanceof Text) { writeText((Text) w); } else if (w instanceof ArrayWritable) { writeArray((ArrayWritable) w); } else if (w instanceof MapWritable) { writeMap((MapWritable) w); } else if (w instanceof SortedMapWritable) { writeSortedMap((SortedMapWritable<?>) w); } else if (w instanceof Record) { writeRecord((Record) w); } else { writeWritable(w); // last resort } }
public void writeSortedMap(SortedMapWritable<?> smw) throws IOException { out.writeMapHeader(smw.size()); for (Map.Entry<? extends WritableComparable<?>, Writable> entry : smw.entrySet()) { write(entry.getKey()); write(entry.getValue()); } }
public <K extends WritableComparable<? super K>> SortedMapWritable<K> readSortedMap(SortedMapWritable<K> mw) throws IOException { if (mw == null) { mw = new SortedMapWritable<K>(); } int length = in.readMapHeader(); for (int i = 0; i < length; i++) { @SuppressWarnings("unchecked") K key = (K) read(); Writable value = read(); mw.put(key, value); } return mw; }
protected void setup(Reducer<Text, SortedMapWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException { Configuration mapReduceConfiguration = context.getConfiguration(); String datastoresConfigurationLines = mapReduceConfiguration .get(FlatFileTool.ANALYZER_BEANS_CONFIGURATION_DATASTORES_KEY); String analysisJobXml = mapReduceConfiguration.get(FlatFileTool.ANALYSIS_JOB_XML_KEY); analyzerBeansConfiguration = ConfigurationSerializer .deserializeAnalyzerBeansDatastores(datastoresConfigurationLines); analysisJob = ConfigurationSerializer.deserializeAnalysisJobFromXml(analysisJobXml, analyzerBeansConfiguration); super.setup(context); }
protected void setup(Mapper<LongWritable, Text, Text, SortedMapWritable>.Context context) throws IOException, InterruptedException { Configuration mapReduceConfiguration = context.getConfiguration(); String datastoresConfigurationLines = mapReduceConfiguration .get(FlatFileTool.ANALYZER_BEANS_CONFIGURATION_DATASTORES_KEY); String analysisJobXml = mapReduceConfiguration.get(FlatFileTool.ANALYSIS_JOB_XML_KEY); this.mapperDelegate = new MapperDelegate(datastoresConfigurationLines, analysisJobXml); csvParser = new CsvParser(mapperDelegate.getAnalysisJob().getSourceColumns(), ";"); super.setup(context); }
private int runMapReduceJob(String input, String output, Configuration mapReduceConfiguration) throws IOException, InterruptedException, ClassNotFoundException { Job job = Job.getInstance(mapReduceConfiguration); job.setJarByClass(FlatFileMapper.class); job.setJobName(this.getClass().getName()); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.setMapperClass(FlatFileMapper.class); job.setReducerClass(FlatFileReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SortedMapWritable.class); job.setNumReduceTasks(1); // TODO externalize to args? // mapReduceConfiguration.addResource(new Path("/etc/hadoop/conf/core-site.xml")); FileSystem fileSystem = FileSystem.get(mapReduceConfiguration); if (fileSystem.exists(new Path(output))) fileSystem.delete(new Path(output), true); boolean success = job.waitForCompletion(true); return success ? 0 : 1; }
@Test public void testReducerPoland() throws IOException { List<SortedMapWritable> rows = new ArrayList<SortedMapWritable>(); SortedMapWritable poland = new SortedMapWritable(); poland.put(new Text("Country name"), new Text("Poland")); poland.put(new Text("ISO 3166-2"), new Text("PL")); poland.put(new Text("ISO 3166-3"), new Text("POL")); rows.add(poland); reduceDriver.withInput(new Text("Value distribution (Country name)"), rows); reduceDriver.withOutput(NullWritable.get(), new Text("Poland;PL;POL")); reduceDriver.runTest(); }
public void emit(ConsumeRowResult consumeRowResult, List<AnalyzerJob> analyzerJobs) throws IOException, InterruptedException { Iterator<OutcomeSink> outcomeSinksIterator = consumeRowResult.getOutcomeSinks().iterator(); for (InputRow transformedRow : consumeRowResult.getRows()) { SortedMapWritable rowWritable = RowUtils.inputRowToSortedMapWritable(transformedRow); OutcomeSink outcomeSink = outcomeSinksIterator.next(); for (AnalyzerJob analyzerJob : analyzerJobs) { if (isAnalyzerSatisfied(analyzerJob, outcomeSink)) { String analyzerLabel = LabelUtils.getLabel(analyzerJob); logger.info("Emitting " + transformedRow + " to " + analyzerLabel); callback.write(new Text(analyzerLabel), rowWritable); } } } }
public static void printSortedMapWritable(SortedMapWritable row, Logger logger) { logger.info("Row: "); for (@SuppressWarnings("rawtypes") Map.Entry<WritableComparable, Writable> entry : row.entrySet()) { Text columnName = (Text) entry.getKey(); if (entry.getValue() instanceof Text) { Text columnValue = (Text) entry.getValue(); logger.info("\t" + columnName + " = " + columnValue); } else { logger.info("\t" + columnName + " = " + null); } } }
public static SortedMapWritable inputRowToSortedMapWritable(InputRow inputRow) { SortedMapWritable rowWritable = new SortedMapWritable(); for (InputColumn<?> inputColumn : inputRow.getInputColumns()) { String columnName = inputColumn.getName(); Object value = inputRow.getValue(inputColumn); if (value != null) rowWritable.put(new Text(columnName), new Text(value.toString())); else rowWritable.put(new Text(columnName), NullWritable.get()); } return rowWritable; }
@Before public void setUp() { this.mapperEmitter = new MapperEmitter(new MapperEmitter.Callback() { public void write(Text text, SortedMapWritable row) throws IOException, InterruptedException { emitList.add(row); } }); }
@Override protected void setup( org.apache.hadoop.mapreduce.Mapper</* KEYIN */ImmutableBytesWritable, /* VALUEIN */Result, /* KEYOUT */Text, /* VALUEOUT */SortedMapWritable>.Context context) throws IOException, InterruptedException { Configuration mapReduceConfiguration = context.getConfiguration(); String datastoresConfigurationLines = mapReduceConfiguration .get(HadoopDataCleanerTool.ANALYZER_BEANS_CONFIGURATION_DATASTORES_KEY); String analysisJobXml = mapReduceConfiguration.get(HadoopDataCleanerTool.ANALYSIS_JOB_XML_KEY); mapperDelegate = new MapperDelegate(datastoresConfigurationLines, analysisJobXml); hBaseParser = new HBaseParser(mapperDelegate.getAnalysisJob().getSourceColumns()); super.setup(context); }
@Override protected void setup( org.apache.hadoop.mapreduce.Reducer</* KEYIN */Text, /* VALUEIN */SortedMapWritable, /* KEYOUT */NullWritable, /* VALUEOUT */Mutation>.Context context) throws IOException, InterruptedException { Configuration mapReduceConfiguration = context.getConfiguration(); String datastoresConfigurationLines = mapReduceConfiguration .get(HadoopDataCleanerTool.ANALYZER_BEANS_CONFIGURATION_DATASTORES_KEY); String analysisJobXml = mapReduceConfiguration.get(HadoopDataCleanerTool.ANALYSIS_JOB_XML_KEY); analyzerBeansConfiguration = ConfigurationSerializer .deserializeAnalyzerBeansDatastores(datastoresConfigurationLines); analysisJob = ConfigurationSerializer.deserializeAnalysisJobFromXml(analysisJobXml, analyzerBeansConfiguration); super.setup(context); }