/** Seek to a position. */ @Override public void seek(long pos) throws IOException { Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset."); checkStream(); try { /* * If data of target pos in the underlying stream has already been read * and decrypted in outBuffer, we just need to re-position outBuffer. */ if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) { int forward = (int) (pos - (streamOffset - outBuffer.remaining())); if (forward > 0) { outBuffer.position(outBuffer.position() + forward); } } else { ((Seekable) in).seek(pos); resetStreamOffset(pos); } } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "seek."); } }
/** Test get position. */ @Test(timeout=120000) public void testGetPos() throws Exception { OutputStream out = getOutputStream(defaultBufferSize); writeData(out); // Default buffer size InputStream in = getInputStream(defaultBufferSize); byte[] result = new byte[dataLen]; int n1 = readAll(in, result, 0, dataLen / 3); Assert.assertEquals(n1, ((Seekable) in).getPos()); int n2 = readAll(in, result, n1, dataLen - n1); Assert.assertEquals(n1 + n2, ((Seekable) in).getPos()); in.close(); }
@Test public void testOnMessageSuccessful() throws IOException { InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class)); doReturn(42).when(mis).read(any(byte[].class), anyInt(), anyInt()); FSDataInputStream fdis = new FSDataInputStream(mis); Response response = getResponse(7L, 4096, fdis); InOrder inOrder = Mockito.inOrder(mis); inOrder.verify((Seekable) mis).seek(7); inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt()); assertEquals(42, ((DFS.GetFileDataResponse) response.pBody).getRead()); assertEquals(42, response.dBodies[0].readableBytes()); }
@Test public void testOnMessageEOF() throws IOException { InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class)); doReturn(-1).when(mis).read(any(byte[].class), anyInt(), anyInt()); FSDataInputStream fdis = new FSDataInputStream(mis); Response response = getResponse(7L, 4096, fdis); InOrder inOrder = Mockito.inOrder(mis); inOrder.verify((Seekable) mis).seek(7); inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt()); assertEquals(-1, ((DFS.GetFileDataResponse) response.pBody).getRead()); assertEquals(0, response.dBodies.length); }
/** Seek to a position. */ @Override public void seek(long pos) throws IOException { if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } checkStream(); try { /* * If data of target pos in the underlying stream has already been read * and decrypted in outBuffer, we just need to re-position outBuffer. */ if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) { int forward = (int) (pos - (streamOffset - outBuffer.remaining())); if (forward > 0) { outBuffer.position(outBuffer.position() + forward); } } else { ((Seekable) in).seek(pos); resetStreamOffset(pos); } } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "seek."); } }
@Override public boolean next(LongWritable key, Text value) throws IOException { if (this.pos < this.end) { if (readUntilMatch(this.startTag, false)) { this.recordStartPos = this.pos - this.startTag.length; try { this.buffer.write(this.startTag); if (readUntilMatch(this.endTag, true)) { key.set(this.recordStartPos); value.set(this.buffer.getData(), 0, this.buffer.getLength()); return true; } } finally { if (this.fsin instanceof Seekable) { if (this.pos != ((Seekable) this.fsin).getPos()) { throw new RuntimeException("bytes consumed error!"); } } this.buffer.reset(); } } } return false; }
/** * If input stream is {@link org.apache.hadoop.fs.Seekable}, return it's * current position, otherwise return 0; */ public static long getInputStreamOffset(InputStream in) throws IOException { if (in instanceof Seekable) { return ((Seekable) in).getPos(); } return 0; }
@Override public boolean seekToNewSource(long targetPos) throws IOException { Preconditions.checkArgument(targetPos >= 0, "Cannot seek to negative offset."); checkStream(); try { boolean result = ((Seekable) in).seekToNewSource(targetPos); resetStreamOffset(targetPos); return result; } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "seekToNewSource."); } }
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { checkStream(); try { if (outBuffer.remaining() > 0) { // Have some decrypted data unread, need to reset. ((Seekable) in).seek(getPos()); resetStreamOffset(getPos()); } final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in). read(bufferPool, maxLength, opts); if (buffer != null) { final int n = buffer.remaining(); if (n > 0) { streamOffset += buffer.remaining(); // Read n bytes final int pos = buffer.position(); decrypt(buffer, n, pos); } } return buffer; } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "enhanced byte buffer access."); } }
/** * This method returns the current position in the stream. * * @return Current position in stream as a long */ @Override public long getPos() throws IOException { if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){ //This way of getting the current position will not work for file //size which can be fit in an int and hence can not be returned by //available method. return (this.maxAvailableData - this.in.available()); } else{ return ((Seekable)this.in).getPos(); } }
private void seekCheck(InputStream in, int pos) throws Exception { byte[] result = new byte[dataLen]; ((Seekable) in).seek(pos); int n = readAll(in, result, 0, dataLen); Assert.assertEquals(dataLen, n + pos); byte[] readData = new byte[n]; System.arraycopy(result, 0, readData, 0, n); byte[] expectedData = new byte[n]; System.arraycopy(data, pos, expectedData, 0, n); Assert.assertArrayEquals(readData, expectedData); }
/** Test skip. */ @Test(timeout=120000) public void testSkip() throws Exception { OutputStream out = getOutputStream(defaultBufferSize); writeData(out); // Default buffer size InputStream in = getInputStream(defaultBufferSize); byte[] result = new byte[dataLen]; int n1 = readAll(in, result, 0, dataLen / 3); Assert.assertEquals(n1, ((Seekable) in).getPos()); long skipped = in.skip(dataLen / 3); int n2 = readAll(in, result, 0, dataLen); Assert.assertEquals(dataLen, n1 + skipped + n2); byte[] readData = new byte[n2]; System.arraycopy(result, 0, readData, 0, n2); byte[] expectedData = new byte[n2]; System.arraycopy(data, dataLen - n2, expectedData, 0, n2); Assert.assertArrayEquals(readData, expectedData); try { skipped = in.skip(-3); Assert.fail("Skip Negative length should fail."); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains("Negative skip length", e); } // Skip after EOF skipped = in.skip(3); Assert.assertEquals(skipped, 0); in.close(); }
private void seekToNewSourceCheck(InputStream in, int targetPos) throws Exception { byte[] result = new byte[dataLen]; ((Seekable) in).seekToNewSource(targetPos); int n = readAll(in, result, 0, dataLen); Assert.assertEquals(dataLen, n + targetPos); byte[] readData = new byte[n]; System.arraycopy(result, 0, readData, 0, n); byte[] expectedData = new byte[n]; System.arraycopy(data, targetPos, expectedData, 0, n); Assert.assertArrayEquals(readData, expectedData); }
/** * Creates a new instance with the mandatory characters for handling newlines transparently. * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()} * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input. */ public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) { byte[] lineSeparator = settings.getNewLineDelimiter(); byte normalizedLineSeparator = settings.getNormalizedNewLine(); Preconditions.checkArgument(lineSeparator != null && (lineSeparator.length == 1 || lineSeparator.length == 2), "Invalid line separator. Expected 1 to 2 characters"); Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable."); boolean isCompressed = input instanceof CompressionInputStream ; Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream."); // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely. if(isCompressed && endPos > 0){ endPos = Long.MAX_VALUE; } this.input = input; this.seekable = (Seekable) input; this.settings = settings; if(input instanceof FSDataInputStream){ this.inputFS = (FSDataInputStream) input; this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable; }else{ this.inputFS = null; this.bufferReadable = false; } this.startPos = startPos; this.endPos = endPos; this.lineSeparator1 = lineSeparator[0]; this.lineSeparator2 = lineSeparator.length == 2 ? lineSeparator[1] : NULL_BYTE; this.normalizedLineSeparator = normalizedLineSeparator; this.buffer = readBuffer; this.bStart = buffer.memoryAddress(); this.bStartMinus1 = bStart -1; this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity()); }
/** * Creates a new instance with the mandatory characters for handling newlines transparently. * lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()} * normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input. */ public TextInput(TextParsingSettings settings, InputStream input, ArrowBuf readBuffer, long startPos, long endPos) { this.lineSeparator = settings.getNewLineDelimiter(); byte normalizedLineSeparator = settings.getNormalizedNewLine(); Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable."); boolean isCompressed = input instanceof CompressionInputStream ; Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream."); // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely. if(isCompressed && endPos > 0){ endPos = Long.MAX_VALUE; } this.input = input; this.seekable = (Seekable) input; this.settings = settings; if(input instanceof FSDataInputStream){ this.inputFS = (FSDataInputStream) input; this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable; }else{ this.inputFS = null; this.bufferReadable = false; } this.startPos = startPos; this.endPos = endPos; this.normalizedLineSeparator = normalizedLineSeparator; this.buffer = readBuffer; this.bStart = buffer.memoryAddress(); this.bStartMinus1 = bStart -1; this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity()); }