@Test public void testCopyStreamTargetExists() throws Exception { FSDataOutputStream out = mock(FSDataOutputStream.class); whenFsCreate().thenReturn(out); when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat); target.refreshStatus(); // so it's updated as existing cmd.setOverwrite(true); when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat); when(mockFs.delete(eq(path), eq(false))).thenReturn(true); when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true); FSInputStream in = mock(FSInputStream.class); when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); tryCopyStream(in, true); verify(mockFs).delete(eq(path), anyBoolean()); verify(mockFs).rename(eq(tmpPath), eq(path)); verify(mockFs, never()).delete(eq(tmpPath), anyBoolean()); verify(mockFs, never()).close(); }
@Test public void testInterruptedCopyBytes() throws Exception { FSDataOutputStream out = mock(FSDataOutputStream.class); whenFsCreate().thenReturn(out); when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat); FSInputStream in = mock(FSInputStream.class); // make IOUtils.copyBytes fail when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow( new InterruptedIOException()); tryCopyStream(in, false); verify(mockFs).delete(eq(tmpPath), anyBoolean()); verify(mockFs, never()).rename(any(Path.class), any(Path.class)); verify(mockFs, never()).delete(eq(path), anyBoolean()); verify(mockFs, never()).close(); }
@Test public void testInterruptedRename() throws Exception { FSDataOutputStream out = mock(FSDataOutputStream.class); whenFsCreate().thenReturn(out); when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat); when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow( new InterruptedIOException()); FSInputStream in = mock(FSInputStream.class); when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); tryCopyStream(in, false); verify(mockFs).delete(eq(tmpPath), anyBoolean()); verify(mockFs).rename(eq(tmpPath), eq(path)); verify(mockFs, never()).delete(eq(path), anyBoolean()); verify(mockFs, never()).close(); }
/** * Try to open a file for reading several times. * * If we fail because lease recovery hasn't completed, retry the open. */ private static FSInputStream dfsOpenFileWithRetries(DistributedFileSystem dfs, String pathName) throws IOException { IOException exc = null; for (int tries = 0; tries < 10; tries++) { try { return dfs.dfs.open(pathName); } catch (IOException e) { exc = e; } if (!exc.getMessage().contains("Cannot obtain " + "block length for LocatedBlock")) { throw exc; } try { Thread.sleep(1000); } catch (InterruptedException ignored) {} } throw exc; }
@Override public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { LOG.debug("Opening '{}' for reading.", path); final FileStatus fileStatus = getFileStatus(path); if (fileStatus.isDirectory()) { final String msg = String.format("Can't open %s because it is a directory", path); throw new FileNotFoundException(msg); } String mantaPath = mantaPath(path); MantaSeekableByteChannel channel = client.getSeekableByteChannel(mantaPath); FSInputStream fsInput = new MantaSeekableInputStream(channel); return new FSDataInputStream(fsInput); }
/** * Send a partial content response with the given range. If there are * no satisfiable ranges, or if multiple ranges are requested, which * is unsupported, respond with range not satisfiable. * * @param in stream to read from * @param out stream to write to * @param response http response to use * @param contentLength for the response header * @param ranges to write to respond with * @throws IOException on error sending the response */ static void sendPartialData(FSInputStream in, OutputStream out, HttpServletResponse response, long contentLength, List<InclusiveByteRange> ranges) throws IOException { if (ranges == null || ranges.size() != 1) { response.setContentLength(0); response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE); response.setHeader("Content-Range", InclusiveByteRange.to416HeaderRangeString(contentLength)); } else { InclusiveByteRange singleSatisfiableRange = ranges.get(0); long singleLength = singleSatisfiableRange.getSize(contentLength); response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT); response.setHeader("Content-Range", singleSatisfiableRange.toHeaderRangeString(contentLength)); copyFromOffset(in, out, singleSatisfiableRange.getFirst(contentLength), singleLength); } }
/** * Send a partial content response with the given range. If there are * no satisfiable ranges, or if multiple ranges are requested, which * is unsupported, respond with range not satisfiable. * * @param in * stream to read from * @param out * stream to write to * @param response * http response to use * @param contentLength * for the response header * @param ranges * to write to respond with * @throws IOException * on error sending the response */ static void sendPartialData(FSInputStream in, OutputStream out, HttpServletResponse response, long contentLength, List<InclusiveByteRange> ranges) throws IOException { if (ranges == null || ranges.size() != 1) { response.setContentLength(0); response .setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE); response.setHeader("Content-Range", InclusiveByteRange.to416HeaderRangeString(contentLength)); } else { InclusiveByteRange singleSatisfiableRange = ranges.get(0); long singleLength = singleSatisfiableRange.getSize(contentLength); response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT); response.setHeader("Content-Range", singleSatisfiableRange.toHeaderRangeString(contentLength)); copyFromOffset(in, out, singleSatisfiableRange.getFirst(contentLength), singleLength); } }
@Test public void testWriteTo() throws IOException { FSInputStream fsin = new MockFSInputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream(); // new int[]{s_1, c_1, s_2, c_2, ..., s_n, c_n} means to test // reading c_i bytes starting at s_i int[] pairs = new int[]{0, 10000, 50, 100, 50, 6000, 1000, 2000, 0, 1, 0, 0, 5000, 0,}; assertTrue("Pairs array must be even", pairs.length % 2 == 0); for (int i = 0; i < pairs.length; i += 2) { StreamFile.copyFromOffset(fsin, os, pairs[i], pairs[i + 1]); assertArrayEquals( "Reading " + pairs[i + 1] + " bytes from offset " + pairs[i], getOutputArray(pairs[i], pairs[i + 1]), os.toByteArray()); os.reset(); } }