Java 类org.apache.hadoop.io.SortedMapWritable 实例源码
项目:hadoop-datacleaner
文件:FlatFileReducer.java
@Override
public void reduce(Text analyzerKey, Iterable<SortedMapWritable> rows, Context context) throws IOException,
InterruptedException {
Analyzer<?> analyzer = ConfigurationSerializer.initializeAnalyzer(analyzerKey.toString(),
analyzerBeansConfiguration, analysisJob);
logger.info("analyzerKey = " + analyzerKey.toString() + " rows: ");
for (SortedMapWritable rowWritable : rows) {
InputRow inputRow = RowUtils.sortedMapWritableToInputRow(rowWritable, analysisJob.getSourceColumns());
analyzer.run(inputRow, 1);
RowUtils.printSortedMapWritable(rowWritable, logger);
Text finalText = CsvParser.toCsvText(rowWritable);
context.write(NullWritable.get(), finalText);
}
logger.info("end of analyzerKey = " + analyzerKey.toString() + " rows.");
AnalyzerResult analyzerResult = analyzer.getResult();
logger.debug("analyzerResult.toString(): " + analyzerResult.toString());
}
项目:hadoop-datacleaner
文件:CsvParser.java
public static Text toCsvText(SortedMapWritable row) {
Text finalText = new Text();
for (@SuppressWarnings("rawtypes")
Iterator<Entry<WritableComparable, Writable>> iterator = row.entrySet()
.iterator(); iterator.hasNext();) {
@SuppressWarnings("rawtypes")
Entry<WritableComparable, Writable> next = iterator.next();
if (next.getValue() instanceof Text) {
Text value = ((Text) next.getValue());
finalText.set(finalText.toString() + value.toString());
} // else do not append anything - the value is null, so empty.
if (iterator.hasNext())
finalText.set(finalText.toString() + ";");
else
finalText.set(finalText.toString());
}
return finalText;
}
项目:hadoop-datacleaner
文件:FlatFileMapperReducerTest.java
@Test
public void testMapper() throws IOException {
SortedMapWritable expectedPoland = new SortedMapWritable();
expectedPoland.put(new Text("Country name"), new Text("Poland"));
expectedPoland.put(new Text("ISO 3166-2"), new Text("PL"));
expectedPoland.put(new Text("ISO 3166-3"), new Text("POL"));
expectedPoland.put(new Text("ISO Numeric"), new Text("616"));
mapDriver
.withInput(
new LongWritable(0),
new Text(
"Country name;ISO 3166-2;ISO 3166-3;ISO Numeric;Linked to country;Synonym1;Synonym2;Synonym3"))
.withInput(new LongWritable(44), new Text("Poland;PL;POL;616;"));
List<Pair<Text, SortedMapWritable>> actualOutputs = mapDriver.run();
Assert.assertEquals(2, actualOutputs.size());
Pair<Text, SortedMapWritable> actualOutputPoland = actualOutputs.get(0);
actualOutputPoland.getSecond().containsValue("Poland");
}
项目:hadoop-datacleaner
文件:FlatFileMapperReducerTest.java
@Test
public void testReducerHeader() throws IOException {
List<SortedMapWritable> rows = new ArrayList<SortedMapWritable>();
SortedMapWritable header = new SortedMapWritable();
header.put(new Text("ISO 3166-2_ISO 3166-3"), new Text("ISO 3166-2_ISO 3166-3"));
header.put(new Text("Country name"), new Text("Country name"));
header.put(new Text("ISO 3166-2"), new Text("ISO 3166-2"));
header.put(new Text("ISO 3166-3"), new Text("ISO 3166-3"));
header.put(new Text("ISO Numeric"), new Text("ISO Numeric"));
header.put(new Text("Linked to country"), new Text("Linked to country"));
header.put(new Text("Synonym1"), new Text("Synonym1"));
header.put(new Text("Synonym2"), new Text("Synonym2"));
header.put(new Text("Synonym3"), new Text("Synonym3"));
rows.add(header);
reduceDriver.withInput(new Text("Value distribution (Country name)"), rows);
reduceDriver
.withOutput(
NullWritable.get(),
new Text(
"Country name;ISO 3166-2;ISO 3166-2_ISO 3166-3;ISO 3166-3;ISO Numeric;Linked to country;Synonym1;Synonym2;Synonym3"));
reduceDriver.runTest();
}
项目:hadoop-datacleaner
文件:HBaseTableReducer.java
public void reduce(Text analyzerKey, Iterable<SortedMapWritable> writableResults, Context context)
throws IOException, InterruptedException {
Analyzer<?> analyzer = ConfigurationSerializer.initializeAnalyzer(analyzerKey.toString(), analyzerBeansConfiguration, analysisJob);
logger.info("analyzerKey = " + analyzerKey.toString() + " rows: ");
for (SortedMapWritable rowWritable : writableResults) {
InputRow inputRow = RowUtils.sortedMapWritableToInputRow(rowWritable, analysisJob.getSourceColumns());
analyzer.run(inputRow, 1);
Result result = ResultUtils.sortedMapWritableToResult(rowWritable);
ResultUtils.printResult(result, logger);
Put put = ResultUtils.preparePut(result);
context.write(NullWritable.get(), put);
}
logger.info("end of analyzerKey = " + analyzerKey.toString() + " rows.");
AnalyzerResult analyzerResult = analyzer.getResult();
logger.debug("analyzerResult.toString(): " + analyzerResult.toString());
}
项目:hadoop-datacleaner
文件:ResultUtils.java
public static Result sortedMapWritableToResult(SortedMapWritable row) {
List<Cell> cells = new ArrayList<Cell>();
for (@SuppressWarnings("rawtypes")
Map.Entry<WritableComparable, Writable> rowEntry : row.entrySet()) {
Text columnFamilyAndName = (Text) rowEntry.getKey();
Text columnValue = (Text) rowEntry.getValue();
String[] split = columnFamilyAndName.toString().split(":");
String columnFamily = split[0];
String columnName = split[1];
Cell cell = new KeyValue(Bytes.toBytes(columnValue.toString()), Bytes.toBytes(columnFamily),
Bytes.toBytes(columnName), Bytes.toBytes(columnValue.toString()));
cells.add(cell);
}
return Result.create(cells);
}
项目:hadoop-map-reduce-patterns
文件:MedianAndStandardDeviationCommentLengthByHour.java
protected void reduce(IntWritable key, Iterable<SortedMapWritable> values, Context context)
throws IOException, InterruptedException {
SortedMapWritable outValue = new SortedMapWritable();
for (SortedMapWritable v : values) {
for (@SuppressWarnings("rawtypes")
Entry<WritableComparable, Writable> entry : v.entrySet()) {
LongWritable count = (LongWritable) outValue.get(entry.getKey());
if (count != null) {
count.set(count.get() + ((LongWritable) entry.getValue()).get());
} else {
outValue.put(entry.getKey(),
new LongWritable(((LongWritable) entry.getValue()).get()));
}
}
}
context.write(key, outValue);
}
项目:hadoop
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hadoop
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:aliyun-oss-hadoop-fs
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable<?>) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:aliyun-oss-hadoop-fs
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable<?> smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<? extends WritableComparable<?>, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:aliyun-oss-hadoop-fs
文件:TypedBytesWritableInput.java
public <K extends WritableComparable<? super K>>
SortedMapWritable<K> readSortedMap(SortedMapWritable<K> mw)
throws IOException {
if (mw == null) {
mw = new SortedMapWritable<K>();
}
int length = in.readMapHeader();
for (int i = 0; i < length; i++) {
@SuppressWarnings("unchecked")
K key = (K) read();
Writable value = read();
mw.put(key, value);
}
return mw;
}
项目:big-c
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:big-c
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hadoop-plus
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hadoop-plus
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hops
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hops
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hadoop-TCP
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hadoop-TCP
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hadoop-on-lustre
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hadoop-on-lustre
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hardfs
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hardfs
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hadoop-on-lustre2
文件:TypedBytesWritableOutput.java
public void write(Writable w) throws IOException {
if (w instanceof TypedBytesWritable) {
writeTypedBytes((TypedBytesWritable) w);
} else if (w instanceof BytesWritable) {
writeBytes((BytesWritable) w);
} else if (w instanceof ByteWritable) {
writeByte((ByteWritable) w);
} else if (w instanceof BooleanWritable) {
writeBoolean((BooleanWritable) w);
} else if (w instanceof IntWritable) {
writeInt((IntWritable) w);
} else if (w instanceof VIntWritable) {
writeVInt((VIntWritable) w);
} else if (w instanceof LongWritable) {
writeLong((LongWritable) w);
} else if (w instanceof VLongWritable) {
writeVLong((VLongWritable) w);
} else if (w instanceof FloatWritable) {
writeFloat((FloatWritable) w);
} else if (w instanceof DoubleWritable) {
writeDouble((DoubleWritable) w);
} else if (w instanceof Text) {
writeText((Text) w);
} else if (w instanceof ArrayWritable) {
writeArray((ArrayWritable) w);
} else if (w instanceof MapWritable) {
writeMap((MapWritable) w);
} else if (w instanceof SortedMapWritable) {
writeSortedMap((SortedMapWritable) w);
} else if (w instanceof Record) {
writeRecord((Record) w);
} else {
writeWritable(w); // last resort
}
}
项目:hadoop-on-lustre2
文件:TypedBytesWritableOutput.java
public void writeSortedMap(SortedMapWritable smw) throws IOException {
out.writeMapHeader(smw.size());
for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}
项目:hadoop-datacleaner
文件:FlatFileReducer.java
protected void setup(Reducer<Text, SortedMapWritable, NullWritable, Text>.Context context) throws IOException,
InterruptedException {
Configuration mapReduceConfiguration = context.getConfiguration();
String datastoresConfigurationLines = mapReduceConfiguration
.get(FlatFileTool.ANALYZER_BEANS_CONFIGURATION_DATASTORES_KEY);
String analysisJobXml = mapReduceConfiguration.get(FlatFileTool.ANALYSIS_JOB_XML_KEY);
analyzerBeansConfiguration = ConfigurationSerializer
.deserializeAnalyzerBeansDatastores(datastoresConfigurationLines);
analysisJob = ConfigurationSerializer.deserializeAnalysisJobFromXml(analysisJobXml, analyzerBeansConfiguration);
super.setup(context);
}
项目:hadoop-datacleaner
文件:FlatFileMapper.java
protected void setup(Mapper<LongWritable, Text, Text, SortedMapWritable>.Context context) throws IOException,
InterruptedException {
Configuration mapReduceConfiguration = context.getConfiguration();
String datastoresConfigurationLines = mapReduceConfiguration
.get(FlatFileTool.ANALYZER_BEANS_CONFIGURATION_DATASTORES_KEY);
String analysisJobXml = mapReduceConfiguration.get(FlatFileTool.ANALYSIS_JOB_XML_KEY);
this.mapperDelegate = new MapperDelegate(datastoresConfigurationLines, analysisJobXml);
csvParser = new CsvParser(mapperDelegate.getAnalysisJob().getSourceColumns(), ";");
super.setup(context);
}
项目:hadoop-datacleaner
文件:FlatFileTool.java
private int runMapReduceJob(String input, String output, Configuration mapReduceConfiguration) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(mapReduceConfiguration);
job.setJarByClass(FlatFileMapper.class);
job.setJobName(this.getClass().getName());
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.setMapperClass(FlatFileMapper.class);
job.setReducerClass(FlatFileReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SortedMapWritable.class);
job.setNumReduceTasks(1);
// TODO externalize to args?
// mapReduceConfiguration.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
FileSystem fileSystem = FileSystem.get(mapReduceConfiguration);
if (fileSystem.exists(new Path(output)))
fileSystem.delete(new Path(output), true);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
项目:hadoop-datacleaner
文件:FlatFileMapperReducerTest.java
@Test
public void testReducerPoland() throws IOException {
List<SortedMapWritable> rows = new ArrayList<SortedMapWritable>();
SortedMapWritable poland = new SortedMapWritable();
poland.put(new Text("Country name"), new Text("Poland"));
poland.put(new Text("ISO 3166-2"), new Text("PL"));
poland.put(new Text("ISO 3166-3"), new Text("POL"));
rows.add(poland);
reduceDriver.withInput(new Text("Value distribution (Country name)"), rows);
reduceDriver.withOutput(NullWritable.get(), new Text("Poland;PL;POL"));
reduceDriver.runTest();
}
项目:hadoop-datacleaner
文件:MapperEmitter.java
public void emit(ConsumeRowResult consumeRowResult, List<AnalyzerJob> analyzerJobs) throws IOException, InterruptedException {
Iterator<OutcomeSink> outcomeSinksIterator = consumeRowResult.getOutcomeSinks().iterator();
for (InputRow transformedRow : consumeRowResult.getRows()) {
SortedMapWritable rowWritable = RowUtils.inputRowToSortedMapWritable(transformedRow);
OutcomeSink outcomeSink = outcomeSinksIterator.next();
for (AnalyzerJob analyzerJob : analyzerJobs) {
if (isAnalyzerSatisfied(analyzerJob, outcomeSink)) {
String analyzerLabel = LabelUtils.getLabel(analyzerJob);
logger.info("Emitting " + transformedRow + " to " + analyzerLabel);
callback.write(new Text(analyzerLabel), rowWritable);
}
}
}
}
项目:hadoop-datacleaner
文件:RowUtils.java
public static void printSortedMapWritable(SortedMapWritable row, Logger logger) {
logger.info("Row: ");
for (@SuppressWarnings("rawtypes")
Map.Entry<WritableComparable, Writable> entry : row.entrySet()) {
Text columnName = (Text) entry.getKey();
if (entry.getValue() instanceof Text) {
Text columnValue = (Text) entry.getValue();
logger.info("\t" + columnName + " = " + columnValue);
} else {
logger.info("\t" + columnName + " = " + null);
}
}
}
项目:hadoop-datacleaner
文件:RowUtils.java
public static SortedMapWritable inputRowToSortedMapWritable(InputRow inputRow) {
SortedMapWritable rowWritable = new SortedMapWritable();
for (InputColumn<?> inputColumn : inputRow.getInputColumns()) {
String columnName = inputColumn.getName();
Object value = inputRow.getValue(inputColumn);
if (value != null)
rowWritable.put(new Text(columnName), new Text(value.toString()));
else
rowWritable.put(new Text(columnName), NullWritable.get());
}
return rowWritable;
}
项目:hadoop-datacleaner
文件:MapperEmitterTest.java
@Before
public void setUp() {
this.mapperEmitter = new MapperEmitter(new MapperEmitter.Callback() {
public void write(Text text, SortedMapWritable row) throws IOException, InterruptedException {
emitList.add(row);
}
});
}
项目:hadoop-datacleaner
文件:HBaseTableMapper.java
@Override
protected void setup(
org.apache.hadoop.mapreduce.Mapper</* KEYIN */ImmutableBytesWritable, /* VALUEIN */Result, /* KEYOUT */Text, /* VALUEOUT */SortedMapWritable>.Context context)
throws IOException, InterruptedException {
Configuration mapReduceConfiguration = context.getConfiguration();
String datastoresConfigurationLines = mapReduceConfiguration
.get(HadoopDataCleanerTool.ANALYZER_BEANS_CONFIGURATION_DATASTORES_KEY);
String analysisJobXml = mapReduceConfiguration.get(HadoopDataCleanerTool.ANALYSIS_JOB_XML_KEY);
mapperDelegate = new MapperDelegate(datastoresConfigurationLines, analysisJobXml);
hBaseParser = new HBaseParser(mapperDelegate.getAnalysisJob().getSourceColumns());
super.setup(context);
}
项目:hadoop-datacleaner
文件:HBaseTableReducer.java
@Override
protected void setup(
org.apache.hadoop.mapreduce.Reducer</* KEYIN */Text, /* VALUEIN */SortedMapWritable, /* KEYOUT */NullWritable, /* VALUEOUT */Mutation>.Context context)
throws IOException, InterruptedException {
Configuration mapReduceConfiguration = context.getConfiguration();
String datastoresConfigurationLines = mapReduceConfiguration
.get(HadoopDataCleanerTool.ANALYZER_BEANS_CONFIGURATION_DATASTORES_KEY);
String analysisJobXml = mapReduceConfiguration.get(HadoopDataCleanerTool.ANALYSIS_JOB_XML_KEY);
analyzerBeansConfiguration = ConfigurationSerializer
.deserializeAnalyzerBeansDatastores(datastoresConfigurationLines);
analysisJob = ConfigurationSerializer.deserializeAnalysisJobFromXml(analysisJobXml, analyzerBeansConfiguration);
super.setup(context);
}