@Before public void setUp() throws IOException { // Generate data final int seed = new Random().nextInt(); final DataOutputBuffer dataBuf = new DataOutputBuffer(); final RandomDatum.Generator generator = new RandomDatum.Generator(seed); for(int i = 0; i < count; ++i) { generator.next(); final RandomDatum key = generator.getKey(); final RandomDatum value = generator.getValue(); key.write(dataBuf); value.write(dataBuf); } LOG.info("Generated " + count + " records"); data = dataBuf.getData(); dataLen = dataBuf.getLength(); }
/** Test hsync via SequenceFiles */ @Test public void testSequenceFileSync() throws Exception { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); final FileSystem fs = cluster.getFileSystem(); final Path p = new Path("/testSequenceFileSync/foo"); final int len = 1 << 16; FSDataOutputStream out = fs.create(p, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK), 4096, (short) 1, len, null); Writer w = SequenceFile.createWriter(new Configuration(), Writer.stream(out), Writer.keyClass(RandomDatum.class), Writer.valueClass(RandomDatum.class), Writer.compression(CompressionType.NONE, new DefaultCodec())); w.hflush(); checkSyncMetric(cluster, 0); w.hsync(); checkSyncMetric(cluster, 1); int seed = new Random().nextInt(); RandomDatum.Generator generator = new RandomDatum.Generator(seed); generator.next(); w.append(generator.getKey(), generator.getValue()); w.hsync(); checkSyncMetric(cluster, 2); w.close(); checkSyncMetric(cluster, 2); out.close(); checkSyncMetric(cluster, 3); cluster.shutdown(); }
/** * Test hsync via SequenceFiles */ @Test public void testSequenceFileSync() throws Exception { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); final FileSystem fs = cluster.getFileSystem(); final Path p = new Path("/testSequenceFileSync/foo"); final int len = 1 << 16; FSDataOutputStream out = fs.create(p, FsPermission.getDefault(), EnumSet .of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK), 4096, (short) 1, len, null); Writer w = SequenceFile .createWriter(new Configuration(), Writer.stream(out), Writer.keyClass(RandomDatum.class), Writer.valueClass(RandomDatum.class), Writer.compression(CompressionType.NONE, new DefaultCodec())); w.hflush(); checkSyncMetric(cluster, 0); w.hsync(); checkSyncMetric(cluster, 1); int seed = new Random().nextInt(); RandomDatum.Generator generator = new RandomDatum.Generator(seed); generator.next(); w.append(generator.getKey(), generator.getValue()); w.hsync(); checkSyncMetric(cluster, 2); w.close(); checkSyncMetric(cluster, 2); out.close(); checkSyncMetric(cluster, 3); cluster.shutdown(); }
private static void codecTest(Configuration conf, int seed, int count, String codecClass) throws IOException { // Create the codec CompressionCodec codec = null; try { codec = (CompressionCodec) ReflectionUtils.newInstance(conf.getClassByName(codecClass), conf); } catch (ClassNotFoundException cnfe) { throw new IOException("Illegal codec!"); } LOG.info("Created a Codec object of type: " + codecClass); // Generate data DataOutputBuffer data = new DataOutputBuffer(); RandomDatum.Generator generator = new RandomDatum.Generator(seed); for(int i=0; i < count; ++i) { generator.next(); RandomDatum key = generator.getKey(); RandomDatum value = generator.getValue(); key.write(data); value.write(data); } DataInputBuffer originalData = new DataInputBuffer(); DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData)); originalData.reset(data.getData(), 0, data.getLength()); LOG.info("Generated " + count + " records"); // Compress data DataOutputBuffer compressedDataBuffer = new DataOutputBuffer(); CompressionOutputStream deflateFilter = codec.createOutputStream(compressedDataBuffer); DataOutputStream deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter)); deflateOut.write(data.getData(), 0, data.getLength()); deflateOut.flush(); deflateFilter.finish(); LOG.info("Finished compressing data"); // De-compress data DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, compressedDataBuffer.getLength()); CompressionInputStream inflateFilter = codec.createInputStream(deCompressedDataBuffer); DataInputStream inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter)); // Check for(int i=0; i < count; ++i) { RandomDatum k1 = new RandomDatum(); RandomDatum v1 = new RandomDatum(); k1.readFields(originalIn); v1.readFields(originalIn); RandomDatum k2 = new RandomDatum(); RandomDatum v2 = new RandomDatum(); k2.readFields(inflateIn); v2.readFields(inflateIn); } LOG.info("SUCCESS! Completed checking " + count + " records"); }
private static void codecTest(Configuration conf, int seed, int count, String codecClass) throws IOException { // Create the codec CompressionCodec codec = null; try { codec = (CompressionCodec) ReflectionUtils.newInstance(conf.getClassByName(codecClass), conf); } catch (ClassNotFoundException cnfe) { throw new IOException("Illegal codec!"); } LOG.info("Created a Codec object of type: " + codecClass); // Generate data DataOutputBuffer data = new DataOutputBuffer(); RandomDatum.Generator generator = new RandomDatum.Generator(seed); for(int i=0; i < count; ++i) { generator.next(); RandomDatum key = generator.getKey(); RandomDatum value = generator.getValue(); key.write(data); value.write(data); } DataInputBuffer originalData = new DataInputBuffer(); DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData)); originalData.reset(data.getData(), 0, data.getLength()); LOG.info("Generated " + count + " records"); // Compress data DataOutputBuffer compressedDataBuffer = new DataOutputBuffer(); CompressionOutputStream deflateFilter = codec.createOutputStream(compressedDataBuffer); DataOutputStream deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter)); deflateOut.write(data.getData(), 0, data.getLength()); deflateOut.flush(); deflateFilter.finish(); LOG.info("Finished compressing data"); // De-compress data DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, compressedDataBuffer.getLength()); CompressionInputStream inflateFilter = codec.createInputStream(deCompressedDataBuffer); DataInputStream inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter)); // Check for(int i=0; i < count; ++i) { RandomDatum k1 = new RandomDatum(); RandomDatum v1 = new RandomDatum(); k1.readFields(originalIn); v1.readFields(originalIn); RandomDatum k2 = new RandomDatum(); RandomDatum v2 = new RandomDatum(); k2.readFields(inflateIn); v2.readFields(inflateIn); assertTrue("original and compressed-then-decompressed-output not equal", k1.equals(k2) && v1.equals(v2)); } LOG.info("SUCCESS! Completed checking " + count + " records"); }