@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { if (bufferPool == null) { throw new IOException("Please specify buffer pool."); } ByteBuffer buffer = bufferPool.getBuffer(true, maxLength); int pos = buffer.position(); int n = read(buffer); if (n >= 0) { buffer.position(pos); return buffer; } return null; }
/** * Get or create a memory map for this replica. * * There are two kinds of ClientMmap objects we could fetch here: one that * will always read pre-checksummed data, and one that may read data that * hasn't been checksummed. * * If we fetch the former, "safe" kind of ClientMmap, we have to increment * the anchor count on the shared memory slot. This will tell the DataNode * not to munlock the block until this ClientMmap is closed. * If we fetch the latter, we don't bother with anchoring. * * @param opts The options to use, such as SKIP_CHECKSUMS. * * @return null on failure; the ClientMmap otherwise. */ @Override public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { boolean anchor = verifyChecksum && (opts.contains(ReadOption.SKIP_CHECKSUMS) == false); if (anchor) { if (!createNoChecksumContext()) { if (LOG.isTraceEnabled()) { LOG.trace("can't get an mmap for " + block + " of " + filename + " since SKIP_CHECKSUMS was not given, " + "we aren't skipping checksums, and the block is not mlocked."); } return null; } } ClientMmap clientMmap = null; try { clientMmap = replica.getOrCreateClientMmap(anchor); } finally { if ((clientMmap == null) && anchor) { releaseNoChecksumContext(); } } return clientMmap; }
/** * Get or create a memory map for this replica. * * There are two kinds of ClientMmap objects we could fetch here: one that * will always read pre-checksummed data, and one that may read data that * hasn't been checksummed. * * If we fetch the former, "safe" kind of ClientMmap, we have to increment * the anchor count on the shared memory slot. This will tell the DataNode * not to munlock the block until this ClientMmap is closed. * If we fetch the latter, we don't bother with anchoring. * * @param opts The options to use, such as SKIP_CHECKSUMS. * * @return null on failure; the ClientMmap otherwise. */ @Override public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { boolean anchor = verifyChecksum && !opts.contains(ReadOption.SKIP_CHECKSUMS); if (anchor) { if (!createNoChecksumContext()) { LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not " + "given, we aren't skipping checksums, and the block is not " + "mlocked.", block, filename); return null; } } ClientMmap clientMmap = null; try { clientMmap = replica.getOrCreateClientMmap(anchor); } finally { if ((clientMmap == null) && anchor) { releaseNoChecksumContext(); } } return clientMmap; }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { checkStream(); try { if (outBuffer.remaining() > 0) { // Have some decrypted data unread, need to reset. ((Seekable) in).seek(getPos()); resetStreamOffset(getPos()); } final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in). read(bufferPool, maxLength, opts); if (buffer != null) { final int n = buffer.remaining(); if (n > 0) { streamOffset += buffer.remaining(); // Read n bytes final int pos = buffer.position(); decrypt(buffer, n, pos); } } return buffer; } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "enhanced byte buffer access."); } }
@Test(timeout=120000) public void testHasEnhancedByteBufferAccess() throws Exception { OutputStream out = getOutputStream(defaultBufferSize); writeData(out); InputStream in = getInputStream(defaultBufferSize); final int len1 = dataLen / 8; // ByteBuffer size is len1 ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).read( getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); int n1 = buffer.remaining(); byte[] readData = new byte[n1]; buffer.get(readData); byte[] expectedData = new byte[n1]; System.arraycopy(data, 0, expectedData, 0, n1); Assert.assertArrayEquals(readData, expectedData); ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); // Read len1 bytes readData = new byte[len1]; readAll(in, readData, 0, len1); expectedData = new byte[len1]; System.arraycopy(data, n1, expectedData, 0, len1); Assert.assertArrayEquals(readData, expectedData); // ByteBuffer size is len1 buffer = ((HasEnhancedByteBufferAccess) in).read( getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); int n2 = buffer.remaining(); readData = new byte[n2]; buffer.get(readData); expectedData = new byte[n2]; System.arraycopy(data, n1 + len1, expectedData, 0, n2); Assert.assertArrayEquals(readData, expectedData); ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); in.close(); }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { operatorStats.startWait(); try { return underlyingIs.read(bufferPool, maxLength, opts); } finally { operatorStats.stopWait(); } }
@Override public synchronized ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { if (maxLength == 0) { return EMPTY_BUFFER; } else if (maxLength < 0) { throw new IllegalArgumentException("can't read a negative " + "number of bytes."); } if ((blockReader == null) || (blockEnd == -1)) { if (pos >= getFileLength()) { return null; } /* * If we don't have a blockReader, or the one we have has no more bytes * left to read, we call seekToBlockSource to get a new blockReader and * recalculate blockEnd. Note that we assume we're not at EOF here * (we check this above). */ if ((!seekToBlockSource(pos)) || (blockReader == null)) { throw new IOException("failed to allocate new BlockReader " + "at position " + pos); } } ByteBuffer buffer = null; if (dfsClient.getConf().shortCircuitMmapEnabled) { buffer = tryReadZeroCopy(maxLength, opts); } if (buffer != null) { return buffer; } buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength); if (buffer != null) { getExtendedReadBuffers().put(buffer, bufferPool); } return buffer; }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { try { return underlyingIs.read(bufferPool, maxLength, opts); } catch(FSError e) { throw FileSystemWrapper.propagateFSError(e); } }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { operatorStats.startWait(); try { return super.read(bufferPool, maxLength, opts); } finally { operatorStats.stopWait(); } }
@Override public synchronized ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { if (maxLength == 0) { return EMPTY_BUFFER; } else if (maxLength < 0) { throw new IllegalArgumentException("can't read a negative " + "number of bytes."); } if ((blockReader == null) || (blockEnd == -1)) { if (pos >= getFileLength()) { return null; } /* * If we don't have a blockReader, or the one we have has no more bytes * left to read, we call seekToBlockSource to get a new blockReader and * recalculate blockEnd. Note that we assume we're not at EOF here * (we check this above). */ if ((!seekToBlockSource(pos)) || (blockReader == null)) { throw new IOException("failed to allocate new BlockReader " + "at position " + pos); } } ByteBuffer buffer = null; if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) { buffer = tryReadZeroCopy(maxLength, opts); } if (buffer != null) { return buffer; } buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength); if (buffer != null) { getExtendedReadBuffers().put(buffer, bufferPool); } return buffer; }
/** * May need online read recovery, zero-copy read doesn't make * sense, so don't support it. */ @Override public synchronized ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { throw new UnsupportedOperationException( "Not support enhanced byte buffer access."); }
@Override public synchronized ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { if (maxLength == 0) { return EMPTY_BUFFER; } else if (maxLength < 0) { throw new IllegalArgumentException("can't read a negative " + "number of bytes."); } if ((blockReader == null) || (blockEnd == -1)) { if (pos >= getFileLength()) { return null; } /* * If we don't have a blockReader, or the one we have has no more bytes * left to read, we call seekToBlockSource to get a new blockReader and * recalculate blockEnd. Note that we assume we're not at EOF here * (we check this above). */ if ((!seekToBlockSource(pos)) || (blockReader == null)) { throw new IOException("failed to allocate new BlockReader " + "at position " + pos); } } ByteBuffer buffer = null; if (dfsClient.getConf().shortCircuitMmapEnabled) { buffer = tryReadZeroCopy(maxLength, opts); } if (buffer != null) { return buffer; } buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength); if (buffer != null) { extendedReadBuffers.put(buffer, bufferPool); } return buffer; }
public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException { EnumSet<ReadOption> options = NO_CHECK_SUM; if (verifyChecksums) { options = CHECK_SUM; } return this.in.read(this.pool, maxLength, options); }
private void initialize(Configuration job, long splitStart, long splitLength, Path file) throws IOException { start = splitStart; end = start + splitLength; pos = start; // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); this.readStats = new ReadStatistics(); this.bufferPool = new ElasticByteBufferPool(); boolean skipChecksums = job.getBoolean("bytecount.skipChecksums", false); this.readOption = skipChecksums ? EnumSet.of(ReadOption.SKIP_CHECKSUMS) : EnumSet .noneOf(ReadOption.class); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null != codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor); filePosition = cIn; inputStream = cIn; LOG.info( "Compressed input; cannot compute number of records in the split"); } else { fileIn.seek(start); filePosition = fileIn; inputStream = fileIn; LOG.info("Split pos = " + start + " length " + splitLength); } }
@Override public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { return null; }