@Override void prepare() throws Exception { final Path filePath = new Path(file); DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0); // append to the file and leave the last block under construction out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND), null, null); byte[] appendContent = new byte[100]; new Random().nextBytes(appendContent); out.write(appendContent); ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); LocatedBlocks blks = dfs.getClient() .getLocatedBlocks(file, BlockSize + 1); assertEquals(1, blks.getLocatedBlocks().size()); nodes = blks.get(0).getLocations(); oldBlock = blks.get(0).getBlock(); LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline( oldBlock, client.getClientName()); newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), oldBlock.getNumBytes(), newLbk.getBlock().getGenerationStamp()); }
/** * Similar with testRenameUCFileInSnapshot, but do renaming first and then * append file without closing it. Unit test for HDFS-5425. */ @Test public void testAppendFileAfterRenameInSnapshot() throws Exception { final Path test = new Path("/test"); final Path foo = new Path(test, "foo"); final Path bar = new Path(foo, "bar"); DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED); SnapshotTestHelper.createSnapshot(hdfs, test, "s0"); // rename bar --> bar2 final Path bar2 = new Path(foo, "bar2"); hdfs.rename(bar, bar2); // append file and keep it as underconstruction. FSDataOutputStream out = hdfs.append(bar2); out.writeByte(0); ((DFSOutputStream) out.getWrappedStream()).hsync( EnumSet.of(SyncFlag.UPDATE_LENGTH)); // save namespace and restart restartClusterAndCheckImage(true); }
/** * Test hsync (with updating block length in NameNode) while no data is * actually written yet */ @Test public void hSyncUpdateLength_00() throws IOException { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( 2).build(); DistributedFileSystem fileSystem = cluster.getFileSystem(); try { Path path = new Path(fName); FSDataOutputStream stm = fileSystem.create(path, true, 4096, (short) 2, AppendTestUtil.BLOCK_SIZE); System.out.println("Created file " + path.toString()); ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet .of(SyncFlag.UPDATE_LENGTH)); long currentFileLength = fileSystem.getFileStatus(path).getLen(); assertEquals(0L, currentFileLength); stm.close(); } finally { fileSystem.close(); cluster.shutdown(); } }
@Test public void testLease() throws Exception { try { NameNodeAdapter.setLeasePeriod(fsn, 100, 200); final Path foo = new Path(dir, "foo"); final Path bar = new Path(foo, "bar"); DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, 0); HdfsDataOutputStream out = appendFileWithoutClosing(bar, 100); out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); SnapshotTestHelper.createSnapshot(hdfs, dir, "s0"); hdfs.delete(foo, true); Thread.sleep(1000); try { fsn.writeLock(); NameNodeAdapter.getLeaseManager(fsn).runLeaseChecks(); } finally { fsn.writeUnlock(); } } finally { NameNodeAdapter.setLeasePeriod(fsn, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, HdfsConstants.LEASE_HARDLIMIT_PERIOD); } }
public void hsyncWithSizeUpdate() throws IOException { if (out != null) { if (out instanceof HdfsDataOutputStream) { try { ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); } catch (NoSuchMethodError e){ // We are probably working with an older version of hadoop jars which does not have the // hsync function with SyncFlag. Use the hsync version that does not update the size. out.hsync(); } } else { out.hsync(); } } }
@Override void prepare() throws Exception { final Path filePath = new Path(file); DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0); // append to the file and leave the last block under construction out = this.client.append(file, BlockSize, null, null); byte[] appendContent = new byte[100]; new Random().nextBytes(appendContent); out.write(appendContent); ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); LocatedBlocks blks = dfs.getClient() .getLocatedBlocks(file, BlockSize + 1); assertEquals(1, blks.getLocatedBlocks().size()); nodes = blks.get(0).getLocations(); oldBlock = blks.get(0).getBlock(); LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline( oldBlock, client.getClientName()); newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), oldBlock.getNumBytes(), newLbk.getBlock().getGenerationStamp()); }
/** * Test hsync (with updating block length in NameNode) while no data is * actually written yet */ @Test public void hSyncUpdateLength_00() throws IOException { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( 2).build(); DistributedFileSystem fileSystem = (DistributedFileSystem)cluster.getFileSystem(); try { Path path = new Path(fName); FSDataOutputStream stm = fileSystem.create(path, true, 4096, (short) 2, AppendTestUtil.BLOCK_SIZE); System.out.println("Created file " + path.toString()); ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet .of(SyncFlag.UPDATE_LENGTH)); long currentFileLength = fileSystem.getFileStatus(path).getLen(); assertEquals(0L, currentFileLength); stm.close(); } finally { fileSystem.close(); cluster.shutdown(); } }
/** * Test hsync (with updating block length in NameNode) while no data is * actually written yet */ @Test public void hSyncUpdateLength_00() throws IOException { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); DistributedFileSystem fileSystem = (DistributedFileSystem) cluster.getFileSystem(); try { Path path = new Path(fName); FSDataOutputStream stm = fileSystem .create(path, true, 4096, (short) 2, AppendTestUtil.BLOCK_SIZE); System.out.println("Created file " + path.toString()); ((DFSOutputStream) stm.getWrappedStream()) .hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); long currentFileLength = fileSystem.getFileStatus(path).getLen(); assertEquals(0L, currentFileLength); stm.close(); } finally { fileSystem.close(); cluster.shutdown(); } }
/** * Check the commit status with the given offset * @param commitOffset the offset to commit * @param channel the channel to return response * @param xid the xid of the commit request * @param preOpAttr the preOp attribute * @param fromRead whether the commit is triggered from read request * @return one commit status: COMMIT_FINISHED, COMMIT_WAIT, * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR */ public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) { if (!fromRead) { Preconditions.checkState(channel != null && preOpAttr != null); // Keep stream active updateLastAccessTime(); } Preconditions.checkState(commitOffset >= 0); COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid, preOpAttr, fromRead); if (LOG.isDebugEnabled()) { LOG.debug("Got commit status: " + ret.name()); } // Do the sync outside the lock if (ret == COMMIT_STATUS.COMMIT_DO_SYNC || ret == COMMIT_STATUS.COMMIT_FINISHED) { try { // Sync file data and length fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); ret = COMMIT_STATUS.COMMIT_FINISHED; // Remove COMMIT_DO_SYNC status // Nothing to do for metadata since attr related change is pass-through } catch (ClosedChannelException cce) { if (pendingWrites.isEmpty()) { ret = COMMIT_STATUS.COMMIT_FINISHED; } else { ret = COMMIT_STATUS.COMMIT_ERROR; } } catch (IOException e) { LOG.error("Got stream error during data sync: " + e); // Do nothing. Stream will be closed eventually by StreamMonitor. // status = Nfs3Status.NFS3ERR_IO; ret = COMMIT_STATUS.COMMIT_ERROR; } } return ret; }
/** * Flushes out to all replicas of the block. The data is in the buffers * of the DNs but not necessarily in the DN's OS buffers. * * It is a synchronous operation. When it returns, * it guarantees that flushed data become visible to new readers. * It is not guaranteed that data has been flushed to * persistent store on the datanode. * Block allocations are persisted on namenode. */ @Override public void hflush() throws IOException { TraceScope scope = dfsClient.getPathTraceScope("hflush", src); try { flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); } finally { scope.close(); } }
@Override public void hsync() throws IOException { TraceScope scope = dfsClient.getPathTraceScope("hsync", src); try { flushOrSync(true, EnumSet.noneOf(SyncFlag.class)); } finally { scope.close(); } }
/** * Test the fsimage loading while there is file under construction. */ @Test (timeout=60000) public void testLoadImageWithAppending() throws Exception { Path sub1 = new Path(dir, "sub1"); Path sub1file1 = new Path(sub1, "sub1file1"); Path sub1file2 = new Path(sub1, "sub1file2"); DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, REPLICATION, seed); DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, REPLICATION, seed); hdfs.allowSnapshot(dir); hdfs.createSnapshot(dir, "s0"); HdfsDataOutputStream out = appendFileWithoutClosing(sub1file1, BLOCKSIZE); out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); // save namespace and restart cluster hdfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); hdfs.saveNamespace(); hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); cluster.shutdown(); cluster = new MiniDFSCluster.Builder(conf).format(false) .numDataNodes(REPLICATION).build(); cluster.waitActive(); fsn = cluster.getNamesystem(); hdfs = cluster.getFileSystem(); }
/** * Test adding new blocks but without closing the corresponding the file */ @Test public void testAddBlockUC() throws Exception { DistributedFileSystem fs = cluster.getFileSystem(); final Path file1 = new Path("/file1"); DFSTestUtil.createFile(fs, file1, BLOCKSIZE - 1, REPLICATION, 0L); FSDataOutputStream out = null; try { // append files without closing the streams out = fs.append(file1); String appendContent = "appending-content"; out.writeBytes(appendContent); ((DFSOutputStream) out.getWrappedStream()).hsync( EnumSet.of(SyncFlag.UPDATE_LENGTH)); // restart NN cluster.restartNameNode(true); FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); INodeFile fileNode = fsdir.getINode4Write(file1.toString()).asFile(); BlockInfoContiguous[] fileBlocks = fileNode.getBlocks(); assertEquals(2, fileBlocks.length); assertEquals(BLOCKSIZE, fileBlocks[0].getNumBytes()); assertEquals(BlockUCState.COMPLETE, fileBlocks[0].getBlockUCState()); assertEquals(appendContent.length() - 1, fileBlocks[1].getNumBytes()); assertEquals(BlockUCState.UNDER_CONSTRUCTION, fileBlocks[1].getBlockUCState()); } finally { if (out != null) { out.close(); } } }
/** * The test uses * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} * to write a file with a custom block size so the writes will be * happening across block' boundaries */ @Test public void hFlush_02() throws IOException { Configuration conf = new HdfsConfiguration(); int customPerChecksumSize = 512; int customBlockSize = customPerChecksumSize * 3; // Modify defaul filesystem settings conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); doTheJob(conf, fName, customBlockSize, (short) 2, false, EnumSet.noneOf(SyncFlag.class)); }
/** * The test uses * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} * to write a file with a custom block size so the writes will be * happening across block's and checksum' boundaries */ @Test public void hFlush_03() throws IOException { Configuration conf = new HdfsConfiguration(); int customPerChecksumSize = 400; int customBlockSize = customPerChecksumSize * 3; // Modify defaul filesystem settings conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); doTheJob(conf, fName, customBlockSize, (short) 2, false, EnumSet.noneOf(SyncFlag.class)); }
/** * The test calls * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}. * Similar with {@link #hFlush_02()} , it writes a file with a custom block * size so the writes will be happening across block' boundaries */ @Test public void hSyncUpdateLength_02() throws IOException { Configuration conf = new HdfsConfiguration(); int customPerChecksumSize = 512; int customBlockSize = customPerChecksumSize * 3; // Modify defaul filesystem settings conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); doTheJob(conf, fName, customBlockSize, (short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH)); }
@Test public void hSyncEndBlock_02() throws IOException { Configuration conf = new HdfsConfiguration(); int customPerChecksumSize = 512; int customBlockSize = customPerChecksumSize * 3; // Modify defaul filesystem settings conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); doTheJob(conf, fName, customBlockSize, (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK)); }
/** * The test calls * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}. * Similar with {@link #hFlush_03()} , it writes a file with a custom block * size so the writes will be happening across block's and checksum' * boundaries. */ @Test public void hSyncUpdateLength_03() throws IOException { Configuration conf = new HdfsConfiguration(); int customPerChecksumSize = 400; int customBlockSize = customPerChecksumSize * 3; // Modify defaul filesystem settings conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); doTheJob(conf, fName, customBlockSize, (short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH)); }
@Test public void hSyncEndBlock_03() throws IOException { Configuration conf = new HdfsConfiguration(); int customPerChecksumSize = 400; int customBlockSize = customPerChecksumSize * 3; // Modify defaul filesystem settings conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); doTheJob(conf, fName, customBlockSize, (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK)); }
public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); synchronized (this.writeLock) { out.write(bytes); this.offset += bytes.length; if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { ((HdfsDataOutputStream) this.out).hsync(EnumSet .of(SyncFlag.UPDATE_LENGTH)); } else { this.out.hsync(); } this.syncPolicy.reset(); } } this.collector.ack(tuple); if (this.rotationPolicy.mark(tuple, this.offset)) { rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); } } catch (IOException e) { LOG.warn("write/sync failed.", e); this.collector.fail(tuple); } }