@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { try { return ((HasEnhancedByteBufferAccess)in).read(bufferPool, maxLength, opts); } catch (ClassCastException e) { ByteBuffer buffer = ByteBufferUtil. fallbackRead(this, bufferPool, maxLength); if (buffer != null) { extendedReadBuffers.put(buffer, bufferPool); } return buffer; } }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { if (bufferPool == null) { throw new IOException("Please specify buffer pool."); } ByteBuffer buffer = bufferPool.getBuffer(true, maxLength); int pos = buffer.position(); int n = read(buffer); if (n >= 0) { buffer.position(pos); return buffer; } return null; }
@Override public void releaseBuffer(ByteBuffer buffer) { try { ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer); } catch (ClassCastException e) { ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer); if (bufferPool == null) { throw new IllegalArgumentException("tried to release a buffer " + "that was not created by this stream."); } bufferPool.putBuffer(buffer); } }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { checkStream(); try { if (outBuffer.remaining() > 0) { // Have some decrypted data unread, need to reset. ((Seekable) in).seek(getPos()); resetStreamOffset(getPos()); } final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in). read(bufferPool, maxLength, opts); if (buffer != null) { final int n = buffer.remaining(); if (n > 0) { streamOffset += buffer.remaining(); // Read n bytes final int pos = buffer.position(); decrypt(buffer, n, pos); } } return buffer; } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "enhanced byte buffer access."); } }
private ByteBufferPool getBufferPool() { return new ByteBufferPool() { @Override public ByteBuffer getBuffer(boolean direct, int length) { return ByteBuffer.allocateDirect(length); } @Override public void putBuffer(ByteBuffer buffer) { } }; }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { operatorStats.startWait(); try { return underlyingIs.read(bufferPool, maxLength, opts); } finally { operatorStats.stopWait(); } }
@Override public synchronized ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { if (maxLength == 0) { return EMPTY_BUFFER; } else if (maxLength < 0) { throw new IllegalArgumentException("can't read a negative " + "number of bytes."); } if ((blockReader == null) || (blockEnd == -1)) { if (pos >= getFileLength()) { return null; } /* * If we don't have a blockReader, or the one we have has no more bytes * left to read, we call seekToBlockSource to get a new blockReader and * recalculate blockEnd. Note that we assume we're not at EOF here * (we check this above). */ if ((!seekToBlockSource(pos)) || (blockReader == null)) { throw new IOException("failed to allocate new BlockReader " + "at position " + pos); } } ByteBuffer buffer = null; if (dfsClient.getConf().shortCircuitMmapEnabled) { buffer = tryReadZeroCopy(maxLength, opts); } if (buffer != null) { return buffer; } buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength); if (buffer != null) { getExtendedReadBuffers().put(buffer, bufferPool); } return buffer; }
@Override public synchronized void releaseBuffer(ByteBuffer buffer) { if (buffer == EMPTY_BUFFER) return; Object val = getExtendedReadBuffers().remove(buffer); if (val == null) { throw new IllegalArgumentException("tried to release a buffer " + "that was not created by this stream, " + buffer); } if (val instanceof ClientMmap) { IOUtils.closeQuietly((ClientMmap)val); } else if (val instanceof ByteBufferPool) { ((ByteBufferPool)val).putBuffer(buffer); } }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { try { return underlyingIs.read(bufferPool, maxLength, opts); } catch(FSError e) { throw FileSystemWrapper.propagateFSError(e); } }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { operatorStats.startWait(); try { return super.read(bufferPool, maxLength, opts); } finally { operatorStats.stopWait(); } }
@Override public synchronized ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { if (maxLength == 0) { return EMPTY_BUFFER; } else if (maxLength < 0) { throw new IllegalArgumentException("can't read a negative " + "number of bytes."); } if ((blockReader == null) || (blockEnd == -1)) { if (pos >= getFileLength()) { return null; } /* * If we don't have a blockReader, or the one we have has no more bytes * left to read, we call seekToBlockSource to get a new blockReader and * recalculate blockEnd. Note that we assume we're not at EOF here * (we check this above). */ if ((!seekToBlockSource(pos)) || (blockReader == null)) { throw new IOException("failed to allocate new BlockReader " + "at position " + pos); } } ByteBuffer buffer = null; if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) { buffer = tryReadZeroCopy(maxLength, opts); } if (buffer != null) { return buffer; } buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength); if (buffer != null) { getExtendedReadBuffers().put(buffer, bufferPool); } return buffer; }
/** * May need online read recovery, zero-copy read doesn't make * sense, so don't support it. */ @Override public synchronized ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { throw new UnsupportedOperationException( "Not support enhanced byte buffer access."); }