@Override void invoke() throws Exception { DatanodeInfo[] newNodes = new DatanodeInfo[2]; newNodes[0] = nodes[0]; newNodes[1] = nodes[1]; String[] storageIDs = {"s0", "s1"}; client.getNamenode().updatePipeline(client.getClientName(), oldBlock, newBlock, newNodes, storageIDs); // close can fail if the out.close() commit the block after block received // notifications from Datanode. // Since datanodes and output stream have still old genstamps, these // blocks will be marked as corrupt after HDFS-5723 if RECEIVED // notifications reaches namenode first and close() will fail. DFSTestUtil.abortStream((DFSOutputStream) out.getWrappedStream()); }
/** * Similar with testRenameUCFileInSnapshot, but do renaming first and then * append file without closing it. Unit test for HDFS-5425. */ @Test public void testAppendFileAfterRenameInSnapshot() throws Exception { final Path test = new Path("/test"); final Path foo = new Path(test, "foo"); final Path bar = new Path(foo, "bar"); DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED); SnapshotTestHelper.createSnapshot(hdfs, test, "s0"); // rename bar --> bar2 final Path bar2 = new Path(foo, "bar2"); hdfs.rename(bar, bar2); // append file and keep it as underconstruction. FSDataOutputStream out = hdfs.append(bar2); out.writeByte(0); ((DFSOutputStream) out.getWrappedStream()).hsync( EnumSet.of(SyncFlag.UPDATE_LENGTH)); // save namespace and restart restartClusterAndCheckImage(true); }
@Override void invoke() throws Exception { DatanodeInfo[] newNodes = new DatanodeInfo[2]; newNodes[0] = nodes[0]; newNodes[1] = nodes[1]; final DatanodeManager dm = cluster.getNamesystem(0).getBlockManager() .getDatanodeManager(); final String storageID1 = dm.getDatanode(newNodes[0]).getStorageInfos()[0] .getStorageID(); final String storageID2 = dm.getDatanode(newNodes[1]).getStorageInfos()[0] .getStorageID(); String[] storageIDs = {storageID1, storageID2}; client.getNamenode().updatePipeline(client.getClientName(), oldBlock, newBlock, newNodes, storageIDs); // close can fail if the out.close() commit the block after block received // notifications from Datanode. // Since datanodes and output stream have still old genstamps, these // blocks will be marked as corrupt after HDFS-5723 if RECEIVED // notifications reaches namenode first and close() will fail. DFSTestUtil.abortStream((DFSOutputStream) out.getWrappedStream()); }
public void testBlocksScheduledCounter() throws IOException { MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null); cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); //open a file an write a few bytes: FSDataOutputStream out = fs.create(new Path("/testBlockScheduledCounter")); for (int i=0; i<1024; i++) { out.write(i); } // flush to make sure a block is allocated. ((DFSOutputStream)(out.getWrappedStream())).sync(); ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>(); cluster.getNameNode().namesystem.DFSNodesStatus(dnList, dnList); DatanodeDescriptor dn = dnList.get(0); assertEquals(1, dn.getBlocksScheduled()); // close the file and the counter should go to zero. out.close(); assertEquals(0, dn.getBlocksScheduled()); }
@Override public HdfsDataOutputStream createInternal(Path f, EnumSet<CreateFlag> createFlag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent) throws IOException { final DFSOutputStream dfsos = dfs.primitiveCreate(getUriPath(f), absolutePermission, createFlag, createParent, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics, dfsos.getInitialLen()); }
/** * Sync buffered data to DataNodes (flush to disk devices). * * @param syncFlags * Indicate the detailed semantic and actions of the hsync. * @throws IOException * @see FSDataOutputStream#hsync() */ public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { OutputStream wrappedStream = getWrappedStream(); if (wrappedStream instanceof CryptoOutputStream) { ((CryptoOutputStream) wrappedStream).flush(); wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream(); } ((DFSOutputStream) wrappedStream).hsync(syncFlags); }
/** * Test if the quota can be correctly updated when file length is updated * through fsync */ @Test (timeout=60000) public void testUpdateQuotaForFSync() throws Exception { final Path foo = new Path("/foo"); final Path bar = new Path(foo, "bar"); DFSTestUtil.createFile(dfs, bar, BLOCKSIZE, REPLICATION, 0L); dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); FSDataOutputStream out = dfs.append(bar); out.write(new byte[BLOCKSIZE / 4]); ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature() .getSpaceConsumed(); long ns = quota.getNameSpace(); long ds = quota.getStorageSpace(); assertEquals(2, ns); // foo and bar assertEquals(BLOCKSIZE * 2 * REPLICATION, ds); // file is under construction out.write(new byte[BLOCKSIZE / 4]); out.close(); fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); ns = quota.getNameSpace(); ds = quota.getStorageSpace(); assertEquals(2, ns); assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds); // append another block DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); ns = quota.getNameSpace(); ds = quota.getStorageSpace(); assertEquals(2, ns); // foo and bar assertEquals((BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION, ds); }
/** * Test adding new blocks but without closing the corresponding the file */ @Test public void testAddBlockUC() throws Exception { DistributedFileSystem fs = cluster.getFileSystem(); final Path file1 = new Path("/file1"); DFSTestUtil.createFile(fs, file1, BLOCKSIZE - 1, REPLICATION, 0L); FSDataOutputStream out = null; try { // append files without closing the streams out = fs.append(file1); String appendContent = "appending-content"; out.writeBytes(appendContent); ((DFSOutputStream) out.getWrappedStream()).hsync( EnumSet.of(SyncFlag.UPDATE_LENGTH)); // restart NN cluster.restartNameNode(true); FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); INodeFile fileNode = fsdir.getINode4Write(file1.toString()).asFile(); BlockInfoContiguous[] fileBlocks = fileNode.getBlocks(); assertEquals(2, fileBlocks.length); assertEquals(BLOCKSIZE, fileBlocks[0].getNumBytes()); assertEquals(BlockUCState.COMPLETE, fileBlocks[0].getBlockUCState()); assertEquals(appendContent.length() - 1, fileBlocks[1].getNumBytes()); assertEquals(BlockUCState.UNDER_CONSTRUCTION, fileBlocks[1].getBlockUCState()); } finally { if (out != null) { out.close(); } } }
/** * This method gets the pipeline for the current WAL. */ @VisibleForTesting DatanodeInfo[] getPipeLine() { if (this.hdfs_out != null) { if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) { return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline(); } } return new DatanodeInfo[0]; }
public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats, long startPosition) throws IOException { super(out, stats, startPosition); Preconditions.checkArgument( out.getWrappedStream() instanceof DFSOutputStream, "CryptoOutputStream should wrap a DFSOutputStream"); }
/** * Sync buffered data to DataNodes (flush to disk devices). * * @param syncFlags * Indicate the detailed semantic and actions of the hsync. * @throws IOException * @see FSDataOutputStream#hsync() */ public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { OutputStream wrappedStream = getWrappedStream(); if (wrappedStream instanceof CryptoOutputStream) { wrappedStream.flush(); wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream(); } ((DFSOutputStream) wrappedStream).hsync(syncFlags); }
public synchronized void put(final long inodeId, final DFSOutputStream out, final DFSClient dfsc) { if (dfsc.isClientRunning()) { if (!isRunning() || isRenewerExpired()) { //start a new deamon with a new id. final int id = ++currentId; daemon = new Daemon(new Runnable() { @Override public void run() { try { if (LOG.isDebugEnabled()) { LOG.debug("Lease renewer daemon for " + clientsString() + " with renew id " + id + " started"); } LeaseRenewer.this.run(id); } catch(InterruptedException e) { LOG.debug("LeaseRenewer is interrupted.", e); } finally { synchronized(LeaseRenewer.this) { Factory.INSTANCE.remove(LeaseRenewer.this); } if (LOG.isDebugEnabled()) { LOG.debug("Lease renewer daemon for " + clientsString() + " with renew id " + id + " exited"); } } } @Override public String toString() { return String.valueOf(LeaseRenewer.this); } }); daemon.start(); } dfsc.putFileBeingWritten(inodeId, out); emptyTime = Long.MAX_VALUE; } }
@Test public void testRenewal() throws Exception { // Keep track of how many times the lease gets renewed final AtomicInteger leaseRenewalCount = new AtomicInteger(); Mockito.doAnswer(new Answer<Boolean>() { @Override public Boolean answer(InvocationOnMock invocation) throws Throwable { leaseRenewalCount.incrementAndGet(); return true; } }).when(MOCK_DFSCLIENT).renewLease(); // Set up a file so that we start renewing our lease. DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); long fileId = 123L; renewer.put(fileId, mockStream, MOCK_DFSCLIENT); // Wait for lease to get renewed long failTime = Time.monotonicNow() + 5000; while (Time.monotonicNow() < failTime && leaseRenewalCount.get() == 0) { Thread.sleep(50); } if (leaseRenewalCount.get() == 0) { Assert.fail("Did not renew lease at all!"); } renewer.closeFile(fileId, MOCK_DFSCLIENT); }
/** * Test adding new blocks but without closing the corresponding the file */ @Test public void testAddBlockUC() throws Exception { DistributedFileSystem fs = cluster.getFileSystem(); final Path file1 = new Path("/file1"); DFSTestUtil.createFile(fs, file1, BLOCKSIZE - 1, REPLICATION, 0L); FSDataOutputStream out = null; try { // append files without closing the streams out = fs.append(file1); String appendContent = "appending-content"; out.writeBytes(appendContent); ((DFSOutputStream) out.getWrappedStream()).hsync( EnumSet.of(SyncFlag.UPDATE_LENGTH)); // restart NN cluster.restartNameNode(true); FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); INodeFile fileNode = fsdir.getINode4Write(file1.toString()).asFile(); BlockInfo[] fileBlocks = fileNode.getBlocks(); assertEquals(2, fileBlocks.length); assertEquals(BLOCKSIZE, fileBlocks[0].getNumBytes()); assertEquals(BlockUCState.COMPLETE, fileBlocks[0].getBlockUCState()); assertEquals(appendContent.length() - 1, fileBlocks[1].getNumBytes()); assertEquals(BlockUCState.UNDER_CONSTRUCTION, fileBlocks[1].getBlockUCState()); } finally { if (out != null) { out.close(); } } }
/** * Verify that locked bytes are correctly updated when the client goes * away unexpectedly during a write. */ @Test public void testWritePipelineFailure() throws IOException, TimeoutException, InterruptedException { getClusterBuilder().setNumDatanodes(1).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); Path path = new Path("/" + METHOD_NAME + ".dat"); EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST); // Write 1 byte to the file and kill the writer. final FSDataOutputStream fos = fs.create(path, FsPermission.getFileDefault(), createFlags, BUFFER_LENGTH, REPL_FACTOR, BLOCK_SIZE, null); fos.write(new byte[1]); fos.hsync(); DFSTestUtil.abortStream((DFSOutputStream) fos.getWrappedStream()); waitForLockedBytesUsed(fsd, osPageSize); // Delete the file and ensure locked RAM goes to zero. fs.delete(path, false); DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); waitForLockedBytesUsed(fsd, 0); }
/** * Test if the quota can be correctly updated when file length is updated * through fsync */ @Test (timeout=60000) public void testUpdateQuotaForFSync() throws Exception { final Path foo = new Path("/foo"); final Path bar = new Path(foo, "bar"); DFSTestUtil.createFile(dfs, bar, BLOCKSIZE, REPLICATION, 0L); dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); FSDataOutputStream out = dfs.append(bar); out.write(new byte[BLOCKSIZE / 4]); ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); Quota.Counts quota = fooNode.getDirectoryWithQuotaFeature() .getSpaceConsumed(); long ns = quota.get(Quota.NAMESPACE); long ds = quota.get(Quota.DISKSPACE); assertEquals(2, ns); // foo and bar assertEquals(BLOCKSIZE * 2 * REPLICATION, ds); // file is under construction out.write(new byte[BLOCKSIZE / 4]); out.close(); fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); ns = quota.get(Quota.NAMESPACE); ds = quota.get(Quota.DISKSPACE); assertEquals(2, ns); assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds); // append another block DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); ns = quota.get(Quota.NAMESPACE); ds = quota.get(Quota.DISKSPACE); assertEquals(2, ns); // foo and bar assertEquals((BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION, ds); }
public void testRestartNameNode(boolean waitSafeMode) throws Exception { String file = "/testRestartNameNode" + waitSafeMode; // Create a file and write data. FSDataOutputStream out = fs.create(new Path(file)); String clientName = ((DistributedFileSystem) fs).getClient().getClientName(); byte[] buffer = new byte[FILE_LEN]; random.nextBytes(buffer); out.write(buffer); ((DFSOutputStream) out.getWrappedStream()).sync(); // Now shutdown the namenode and try to close the file. cluster.shutdownNameNode(0); Thread closeThread = new CloseThread(out, file, clientName); closeThread.start(); Thread.sleep(CLOSE_FILE_TIMEOUT / 4); // Restart the namenode and verify the close file worked. if (!waitSafeMode) { cluster.restartNameNode(0, new String[]{}, false); cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_LEAVE); } else { cluster.restartNameNode(0); } closeThread.join(5000); assertTrue(pass); }
private void createFile(FileSystem fs, FSDataOutputStream out, String fileName, int fileLen) throws IOException { Random random = new Random(fileName.hashCode()); byte buffer[] = new byte[fileLen]; random.nextBytes(buffer); out.write(buffer); out.sync(); ((DFSOutputStream) out.getWrappedStream()).abortForTests(); }