/** * Returns the corresponding Writable object for this column type. */ public Writable getWritableInstance(com.cloudera.recordservice.core.Schema.Type type) { switch (type) { case BOOLEAN: return new BooleanWritable(); case TINYINT: return new ByteWritable(); case SMALLINT: return new ShortWritable(); case INT: return new IntWritable(); case BIGINT: return new LongWritable(); case FLOAT: return new FloatWritable(); case DOUBLE: return new DoubleWritable(); case VARCHAR: case CHAR: case STRING: return new Text(); case TIMESTAMP_NANOS: return new TimestampNanosWritable(); case DECIMAL: return new DecimalWritable(); default: throw new UnsupportedOperationException( "Unexpected type: " + toString()); } }
public int compareTo(ShortArrayWritable that) { Writable[] self = this.get(); Writable[] other = that.get(); if (self.length != other.length) { // Length decides first. return Integer.valueOf(self.length).compareTo(Integer.valueOf(other.length)); } else { // Then, compare every pair of elements. for (int i = 0; i < self.length; i++) { short s = ((ShortWritable) self[i]).get(); short o = ((ShortWritable) other[i]).get(); if (s != o) return Integer.valueOf(s).compareTo(Integer.valueOf(o)); } // Same length, same elements => same array. return 0; } }
@Override protected void reduce(ShortWritable inKey, Iterable<FloatArrayWritable> inValues, Context context) throws IOException, InterruptedException { // This task sums all the partial results for one stripe of the vector // v_k and adds the teleportation factor. Configuration conf = context.getConfiguration(); int numPages = Integer.parseInt(conf.get("pagerank.num_pages")); float beta = Float.parseFloat(conf.get("pagerank.damping_factor")); FloatWritable[] vi = null; for (FloatArrayWritable inValue : inValues) { Writable[] partialVi = inValue.get(); if (vi == null) { // vi is initialized here in order to know the correct size of // the stripe (the last stripe can be incomplete). vi = new FloatWritable[partialVi.length]; for (int k = 0; k < vi.length; k++) { vi[k] = new FloatWritable(0); } } // Sum the partial results. for (int k = 0; k < vi.length; k++) { vi[k].set(vi[k].get() + ((FloatWritable) partialVi[k]).get()); } } // Add the teleportation factor. for (int k = 0; k < vi.length; k++) { vi[k].set(beta * vi[k].get() + (1 - beta) / numPages); } context.write(inKey, new FloatArrayWritable(vi)); }
private void pageRankIteration(int iter, Configuration conf, Path outputDir) throws Exception { // This job performs an iteration of the power iteration method to // compute PageRank. The map task processes each block M_{i,j}, loads // the corresponding stripe j of the vector v_{k-1} and produces the // partial result of the stripe i of the vector v_k. The reduce task // sums all the partial results of v_k and adds the teleportation factor // (the combiner only sums all the partial results). See Section 5.2 // (and 5.2.3 in particular) of Mining of Massive Datasets // (http://infolab.stanford.edu/~ullman/mmds.html) for details. The // output is written in a "vk" subdir of the output dir, where k is the // iteration number. MapFileOutputFormat is used to keep an array of the // stripes of v. Job job = Job.getInstance(conf, "PageRank:Iteration"); job.setJarByClass(PageRank.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(PageRankIterationMapper.class); job.setMapOutputKeyClass(ShortWritable.class); job.setMapOutputValueClass(FloatArrayWritable.class); job.setCombinerClass(PageRankIterationCombiner.class); job.setReducerClass(PageRankIterationReducer.class); job.setOutputFormatClass(MapFileOutputFormat.class); job.setOutputKeyClass(ShortWritable.class); job.setOutputValueClass(FloatArrayWritable.class); FileInputFormat.addInputPath(job, new Path(outputDir, "M")); FileOutputFormat.setOutputPath(job, new Path(outputDir, "v" + iter)); job.waitForCompletion(true); }
@Override public void map(ShortWritable inKey, FloatArrayWritable inValue, Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); short blockSize = Short.parseShort(conf.get("pagerank.block_size")); int topResults = Integer.parseInt(conf.get("pagerank.top_results")); Writable[] vStripe = inValue.get(); for (int i = 0; i < vStripe.length; i++) { int page = 1 + (inKey.get() - 1) * blockSize + i; float pageRank = ((FloatWritable) vStripe[i]).get(); // The elements in the queue are sorted (in non-decreasing order) by // PageRank. The queue is filled up until it contains topResults // elements. Then, a new element will be added only if its PageRank // is greater than the lowest PageRank in the queue. If the queue is // full and a new element is added, the one with the lowest PageRank // is removed from the queue. if (topN.size() < topResults || pageRank >= topN.peek().getKey()) { topN.add(new AbstractMap.SimpleEntry<Float, Integer>(pageRank, page)); if (topN.size() > topResults) { topN.poll(); } } } }
@Override protected void reduce(ShortWritable inKey, Iterable<FloatArrayWritable> inValues, Context context) throws IOException, InterruptedException { // This task sums all the partial results for one stripe of the vector // v_k. It is a separate class since PageRankIterationReducer also adds // the teleportation factor. FloatWritable[] vi = null; for (FloatArrayWritable inValue : inValues) { Writable[] partialVi = inValue.get(); if (vi == null) { // vi is initialized here in order to know the correct size of // the stripe (the last stripe can be incomplete). vi = new FloatWritable[partialVi.length]; for (int k = 0; k < vi.length; k++) { vi[k] = new FloatWritable(0); } } // Sum the partial results. for (int k = 0; k < vi.length; k++) { vi[k].set(vi[k].get() + ((FloatWritable) partialVi[k]).get()); } } context.write(inKey, new FloatArrayWritable(vi)); }
private void setupMapper(String intermediateTable) throws IOException { // FileInputFormat.setInputPaths(job, input); String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable); HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(FactDistinctColumnsMapper.class); job.setCombinerClass(FactDistinctColumnsCombiner.class); job.setMapOutputKeyClass(ShortWritable.class); job.setMapOutputValueClass(Text.class); }
/** * Writes an {@link Object} to the {@link DataOutput}. * * @param obj * the object to write. * @param out * the data output stream. * @throws IOException * if I/O error occurs. */ public static final void writeObject(Object obj, DataOutput out) throws IOException { try { if (obj == null) { throw new IOException("Writing object is not defined: null."); } else if (ClassUtils.isBoolean(obj)) { (new BooleanWritable((boolean) obj)).write(out); } else if (ClassUtils.isByte(obj)) { (new ByteWritable((byte) obj)).write(out); } else if (ClassUtils.isShort(obj)) { (new ShortWritable((short) obj)).write(out); } else if (ClassUtils.isInteger(obj)) { (new IntWritable((int) obj)).write(out); } else if (ClassUtils.isLong(obj)) { (new LongWritable((long) obj)).write(out); } else if (ClassUtils.isFloat(obj)) { (new FloatWritable((float) obj)).write(out); } else if (ClassUtils.isDouble(obj)) { (new DoubleWritable((double) obj)).write(out); } else if (ClassUtils.isString(obj)) { Text.writeString(out, (String) obj); } else if (ClassUtils.isEnum(obj)) { (new IntWritable(((Enum<?>) obj).ordinal())).write(out); } else if (ClassUtils.isArray(obj)) { int length = Array.getLength(obj); writeObject(length, out); for (int j = 0; j < length; j++) { writeObject(Array.get(obj, j), out); } } else { ((Writable) obj).write(out); } } catch (IllegalArgumentException exc) { throw new IOException(exc); } }
/** read short value */ static short readShort(DataInput in) throws IOException { ShortWritable uShort = TL_DATA.get().U_SHORT; uShort.readFields(in); return uShort.get(); }
/** write short value */ static void writeShort(short value, DataOutputStream out) throws IOException { ShortWritable uShort = TL_DATA.get().U_SHORT; uShort.set(value); uShort.write(out); }
@Override public ShortWritable getPrimitiveWritableObject(Object o) { return o == null ? null : new ShortWritable((Short)o); }
/** * Resets the data in this RecordServiceRecord by translating the column data from the * given Row to the internal array of Writables (columnVals_). * Reads the column data from the given Row into this RecordServiceRecord. The * schema are expected to match, minimal error checks are performed. * This is a performance critical method. */ public void reset(Record record) { if (record.getSchema().cols.size() != schema_.getNumColumns()) { throw new IllegalArgumentException(String.format("Schema for new record does " + "not match existing schema: %d (new) != %d (existing)", record.getSchema().cols.size(), schema_.getNumColumns())); } for (int i = 0; i < schema_.getNumColumns(); ++i) { if (record.isNull(i)) { columnVals_[i] = null; continue; } columnVals_[i] = columnValObjects_[i]; com.cloudera.recordservice.core.Schema.ColumnDesc cInfo = schema_.getColumnInfo(i); Preconditions.checkNotNull(cInfo); switch (cInfo.type.typeId) { case BOOLEAN: ((BooleanWritable) columnValObjects_[i]).set(record.nextBoolean(i)); break; case TINYINT: ((ByteWritable) columnValObjects_[i]).set(record.nextByte(i)); break; case SMALLINT: ((ShortWritable) columnValObjects_[i]).set(record.nextShort(i)); break; case INT: ((IntWritable) columnValObjects_[i]).set(record.nextInt(i)); break; case BIGINT: ((LongWritable) columnValObjects_[i]).set(record.nextLong(i)); break; case FLOAT: ((FloatWritable) columnValObjects_[i]).set(record.nextFloat(i)); break; case DOUBLE: ((DoubleWritable) columnValObjects_[i]).set(record.nextDouble(i)); break; case STRING: case VARCHAR: case CHAR: ByteArray s = record.nextByteArray(i); ((Text) columnValObjects_[i]).set( s.byteBuffer().array(), s.offset(), s.len()); break; case TIMESTAMP_NANOS: ((TimestampNanosWritable) columnValObjects_[i]).set( record.nextTimestampNanos(i)); break; case DECIMAL: ((DecimalWritable) columnValObjects_[i]).set( record.nextDecimal(i)); break; default: throw new RuntimeException("Unsupported type: " + cInfo); } } }
@Test public void testReadAllTypes() throws IOException, InterruptedException { Configuration config = new Configuration(); RecordServiceInputFormat.RecordServiceRecordReader reader = new RecordServiceInputFormat.RecordServiceRecordReader(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); format.setTimeZone(TimeZone.getTimeZone("GMT")); try { RecordServiceConfig.setInputTable(config, null, "rs.alltypes"); List<InputSplit> splits = PlanUtil.getSplits(config, new Credentials()).splits; int numRows = 0; for (InputSplit split: splits) { reader.initialize(split, new TaskAttemptContextImpl(new JobConf(config), new TaskAttemptID())); while (reader.nextKeyValue()) { RecordServiceRecord value = reader.getCurrentValue(); if (((BooleanWritable)value.getColumnValue(0)).get()) { assertEquals(0, ((ByteWritable)value.getColumnValue(1)).get()); assertEquals(1, ((ShortWritable)value.getColumnValue(2)).get()); assertEquals(2, ((IntWritable)value.getColumnValue(3)).get()); assertEquals(3, ((LongWritable)value.getColumnValue(4)).get()); assertEquals(4.0, ((FloatWritable)value.getColumnValue(5)).get(), 0.1); assertEquals(5.0, ((DoubleWritable)value.getColumnValue(6)).get(), 0.1); assertEquals("hello", value.getColumnValue(7).toString()); assertEquals("vchar1", value.getColumnValue(8).toString()); assertEquals("char1", value.getColumnValue(9).toString()); assertEquals("2015-01-01", format.format( ((TimestampNanosWritable)value.getColumnValue(10)).get().toTimeStamp())); assertEquals( new BigDecimal("3.1415920000"), ((DecimalWritable)value.getColumnValue(11)).get().toBigDecimal()); } else { assertEquals(6, ((ByteWritable)value.getColumnValue(1)).get()); assertEquals(7, ((ShortWritable)value.getColumnValue(2)).get()); assertEquals(8, ((IntWritable)value.getColumnValue(3)).get()); assertEquals(9, ((LongWritable)value.getColumnValue(4)).get()); assertEquals(10.0, ((FloatWritable)value.getColumnValue(5)).get(), 0.1); assertEquals(11.0, ((DoubleWritable)value.getColumnValue(6)).get(), 0.1); assertEquals("world", value.getColumnValue(7).toString()); assertEquals("vchar2", value.getColumnValue(8).toString()); assertEquals("char2", value.getColumnValue(9).toString()); assertEquals("2016-01-01", format.format( ((TimestampNanosWritable)value.getColumnValue(10)) .get().toTimeStamp())); assertEquals( new BigDecimal("1234.5678900000"), ((DecimalWritable)value.getColumnValue(11)).get().toBigDecimal()); } ++numRows; } } assertEquals(2, numRows); } finally { reader.close(); } }
/** * Converts object from Hive's value system to Pig's value system * see HCatBaseStorer#getJavaObj() for Pig->Hive conversion * @param o object from Hive value system * @return object in Pig value system */ public static Object extractPigObject( Object o, com.cloudera.recordservice.core.Schema.TypeDesc itemType) throws Exception { // Note that HCatRecordSerDe.serializePrimitiveField() will be called before this, // thus some type promotion/conversion may occur: e.g. Short to Integer. We should // refactor this so that it's hapenning in one place per module/product that we are // integrating with. All Pig conversion should be done here, etc. if(o == null) { return null; } Object result; switch (itemType.typeId) { case BOOLEAN: result = ((BooleanWritable) o).get(); break; case TINYINT: result = ((ByteWritable) o).get(); break; case SMALLINT: result = (int) ((ShortWritable) o).get(); break; case INT: result = ((IntWritable) o).get(); break; case BIGINT: result = ((LongWritable)o).get(); break; case FLOAT: result = ((FloatWritable) o).get(); break; case DOUBLE: result = ((DoubleWritable) o).get(); break; case STRING: case VARCHAR: case CHAR: result = o.toString(); break; case TIMESTAMP_NANOS: TimestampNanos timestampNanos = ((TimestampNanosWritable) o).get(); // TODO: make sure this is correct result = new DateTime(timestampNanos.toTimeStamp(), DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT"))); break; case DECIMAL: Decimal decimal = ((DecimalWritable) o).get(); result = decimal.toBigDecimal(); break; default: result = o; break; } return result; }
public ShortArrayWritable() { super(ShortWritable.class); }
public ShortArrayWritable(Writable[] values) { super(ShortWritable.class, values); }
@Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { // This task gets a line from links-simple-sorted.txt that contains the // out links of a page v. It produces results with keys (i, j) // corresponding to the indexes of the block M_{i,j} in which each // link v -> w should be stored. The value is (v, w, degree(v)). Configuration conf = context.getConfiguration(); short blockSize = Short.parseShort(conf.get("pagerank.block_size")); String[] lineParts = inValue.toString().split(":\\s+"); String[] vOutlinks = lineParts[1].split("\\s+"); ShortWritable[] blockIndexes = new ShortWritable[2]; blockIndexes[0] = new ShortWritable(); blockIndexes[1] = new ShortWritable(); ShortWritable[] blockEntry = new ShortWritable[3]; blockEntry[0] = new ShortWritable(); blockEntry[1] = new ShortWritable(); blockEntry[2] = new ShortWritable(); int v, w; short i, j; v = Integer.parseInt(lineParts[0]); j = (short) ((v - 1) / blockSize + 1); for (int k = 0; k < vOutlinks.length; k++) { w = Integer.parseInt(vOutlinks[k]); i = (short) ((w - 1) / blockSize + 1); // Indexes of the block M_{i,j}. blockIndexes[0].set(i); blockIndexes[1].set(j); // One entry of the block M_{i,j} corresponding to the v -> w link. // The sparse block representation also needs information about // the degree of the vector v. blockEntry[0].set((short) ((v - 1) % blockSize)); blockEntry[1].set((short) ((w - 1) % blockSize)); blockEntry[2].set((short) vOutlinks.length); context.write(new ShortArrayWritable(blockIndexes), new ShortArrayWritable(blockEntry)); } }
public MatrixBlockWritable() { super(ShortWritable.class); }
public MatrixBlockWritable(Writable[][] values) { super(ShortWritable.class, values); }
@Override public void reduce(ShortArrayWritable inKey, Iterable<ShortArrayWritable> inValues, Context context) throws IOException, InterruptedException { // This task receives all the entries in M_{i,j} and builds the compact // representation of the block. See Section 5.2.4 of Mining of Massive // Datasets (http://infolab.stanford.edu/~ullman/mmds.html) for details. // Only blocks with at least one nonzero entry are generated. Configuration conf = context.getConfiguration(); short blockSize = Short.parseShort(conf.get("pagerank.block_size")); short vIndexInBlock, wIndexInBlock, vDegree; List<List<Short>> blockColumns = new ArrayList<List<Short>>(blockSize); for (int k = 0; k < blockSize; k++) { blockColumns.add(new ArrayList<Short>()); } for (ShortArrayWritable inValue : inValues) { Writable[] blockEntry = inValue.get(); vIndexInBlock = ((ShortWritable) blockEntry[0]).get(); wIndexInBlock = ((ShortWritable) blockEntry[1]).get(); vDegree = ((ShortWritable) blockEntry[2]).get(); if (blockColumns.get(vIndexInBlock).isEmpty()) { blockColumns.get(vIndexInBlock).add(vDegree); } blockColumns.get(vIndexInBlock).add(wIndexInBlock); } ShortWritable[][] blockColumnWritables = new ShortWritable[blockColumns.size()][]; for (int k = 0; k < blockColumns.size(); k++) { List<Short> column = blockColumns.get(k); blockColumnWritables[k] = new ShortWritable[column.size()]; for (int l = 0; l < column.size(); l++) { blockColumnWritables[k][l] = new ShortWritable(); blockColumnWritables[k][l].set(column.get(l).shortValue()); } } context.write(inKey, new MatrixBlockWritable(blockColumnWritables)); }
@Override public void map(ShortArrayWritable inKey, MatrixBlockWritable inValue, Context context) throws IOException, InterruptedException { // This task gets each block M_{i,j}, loads the corresponding stripe j // of the vector v_{k-1} and produces the partial result of the stripe i // of the vector v_k. Configuration conf = context.getConfiguration(); int iter = Integer.parseInt(conf.get("pagerank.iteration")); int numPages = Integer.parseInt(conf.get("pagerank.num_pages")); short blockSize = Short.parseShort(conf.get("pagerank.block_size")); Writable[] blockIndexes = inKey.get(); short i = ((ShortWritable) blockIndexes[0]).get(); short j = ((ShortWritable) blockIndexes[1]).get(); int vjSize = (j > numPages / blockSize) ? (numPages % blockSize) : blockSize; FloatWritable[] vj = new FloatWritable[vjSize]; if (iter == 1) { // Initial PageRank vector with 1/n for all pages. for (int k = 0; k < vj.length; k++) { vj[k] = new FloatWritable(1.0f / numPages); } } else { // Load the stripe j of the vector v_{k-1} from the MapFiles. Path outputDir = MapFileOutputFormat.getOutputPath(context).getParent(); Path vjDir = new Path(outputDir, "v" + (iter - 1)); MapFile.Reader[] readers = MapFileOutputFormat.getReaders(vjDir, conf); Partitioner<ShortWritable, FloatArrayWritable> partitioner = new HashPartitioner<ShortWritable, FloatArrayWritable>(); ShortWritable key = new ShortWritable(j); FloatArrayWritable value = new FloatArrayWritable(); MapFileOutputFormat.getEntry(readers, partitioner, key, value); Writable[] writables = value.get(); for (int k = 0; k < vj.length; k++) { vj[k] = (FloatWritable) writables[k]; } for (MapFile.Reader reader : readers) { reader.close(); } } // Initialize the partial result i of the vector v_k. int viSize = (i > numPages / blockSize) ? (numPages % blockSize) : blockSize; FloatWritable[] vi = new FloatWritable[viSize]; for (int k = 0; k < vi.length; k++) { vi[k] = new FloatWritable(0); } // Multiply M_{i,j} by the stripe j of the vector v_{k-1} to obtain the // partial result i of the vector v_k. Writable[][] blockColumns = inValue.get(); for (int k = 0; k < blockColumns.length; k++) { Writable[] blockColumn = blockColumns[k]; if (blockColumn.length > 0) { int vDegree = ((ShortWritable) blockColumn[0]).get(); for (int columnIndex = 1; columnIndex < blockColumn.length; columnIndex++) { int l = ((ShortWritable) blockColumn[columnIndex]).get(); vi[l].set(vi[l].get() + (1.0f / vDegree) * vj[k].get()); } } } context.write(new ShortWritable(i), new FloatArrayWritable(vi)); }
private void setupMapper() throws IOException { String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME); String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName); log.info("setting hcat input format, db name {} , table name {}", dbTableNames[0],dbTableNames[1]); HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(IIDistinctColumnsMapper.class); job.setCombinerClass(IIDistinctColumnsCombiner.class); job.setMapOutputKeyClass(ShortWritable.class); job.setMapOutputValueClass(Text.class); }