/** Positioned read. It is thread-safe */ @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { checkStream(); try { final int n = ((PositionedReadable) in).read(position, buffer, offset, length); if (n > 0) { // This operation does not change the current offset of the file decrypt(position, buffer, offset, n); } return n; } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "positioned read."); } }
/** Positioned read fully. It is thread-safe */ @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { checkStream(); try { ((PositionedReadable) in).readFully(position, buffer, offset, length); if (length > 0) { // This operation does not change the current offset of the file decrypt(position, buffer, offset, length); } } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "positioned readFully."); } }
@Test public void testOnMessageSuccessful() throws IOException { InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class)); doReturn(42).when(mis).read(any(byte[].class), anyInt(), anyInt()); FSDataInputStream fdis = new FSDataInputStream(mis); Response response = getResponse(7L, 4096, fdis); InOrder inOrder = Mockito.inOrder(mis); inOrder.verify((Seekable) mis).seek(7); inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt()); assertEquals(42, ((DFS.GetFileDataResponse) response.pBody).getRead()); assertEquals(42, response.dBodies[0].readableBytes()); }
@Test public void testOnMessageEOF() throws IOException { InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class)); doReturn(-1).when(mis).read(any(byte[].class), anyInt(), anyInt()); FSDataInputStream fdis = new FSDataInputStream(mis); Response response = getResponse(7L, 4096, fdis); InOrder inOrder = Mockito.inOrder(mis); inOrder.verify((Seekable) mis).seek(7); inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt()); assertEquals(-1, ((DFS.GetFileDataResponse) response.pBody).getRead()); assertEquals(0, response.dBodies.length); }
/** Get input stream. */ private PositionedReadable in() throws IOException { synchronized (mux) { if (opened) { if (err != null) throw err; } else { opened = true; try { in = fs.open(path, bufSize); if (in == null) throw new IOException("Failed to open input stream (file system returned null): " + path); } catch (IOException e) { err = e; throw err; } } return in; } }
/** * This method returns the current position in the stream. * * @return Current position in stream as a long */ @Override public long getPos() throws IOException { if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){ //This way of getting the current position will not work for file //size which can be fit in an int and hence can not be returned by //available method. return (this.maxAvailableData - this.in.available()); } else{ return ((Seekable)this.in).getPos(); } }
private int readAll(InputStream in, long pos, byte[] b, int off, int len) throws IOException { int n = 0; int total = 0; while (n != -1) { total += n; if (total >= len) { break; } n = ((PositionedReadable) in).read(pos + total, b, off + total, len - total); } return total; }
private void readFullyCheck(InputStream in, int pos) throws Exception { byte[] result = new byte[dataLen - pos]; ((PositionedReadable) in).readFully(pos, result); byte[] expectedData = new byte[dataLen - pos]; System.arraycopy(data, pos, expectedData, 0, dataLen - pos); Assert.assertArrayEquals(result, expectedData); result = new byte[dataLen]; // Exceeds maximum length try { ((PositionedReadable) in).readFully(pos, result); Assert.fail("Read fully exceeds maximum length should fail."); } catch (IOException e) { } }
/** * Read bytes starting from the specified position. This requires rawStream is an instance of * {@link PositionedReadable}. */ public int read(long position, byte[] buffer, int offset, int length) throws IOException { if (!(rawStream instanceof PositionedReadable)) { throw new UnsupportedOperationException("positioned read is not supported by the internal stream"); } throttle(); int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length); if (readLen != -1) { bytesRead += readLen; } return readLen; }
/** * Read bytes starting from the specified position. This requires rawStream is * an instance of {@link PositionedReadable}. */ public int read(long position, byte[] buffer, int offset, int length) throws IOException { if (!(rawStream instanceof PositionedReadable)) { throw new UnsupportedOperationException( "positioned read is not supported by the internal stream"); } throttle(); int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length); if (readLen != -1) { bytesRead += readLen; } return readLen; }
/** * Read bytes starting from the specified position. This requires rawStream is * an instance of {@link PositionedReadable}. * @param position * @param buffer * @param offset * @param length * @return the number of bytes read */ public int read(long position, byte[] buffer, int offset, int length) throws IOException { if (!(rawStream instanceof PositionedReadable)) { throw new UnsupportedOperationException( "positioned read is not supported by the internal stream"); } throttle(); int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length); if (readLen != -1) { bytesRead += readLen; } return readLen; }