private void verifyAddition(long blockId, long genStamp, long size) { final ReplicaInfo replicainfo; replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); assertNotNull(replicainfo); // Added block has the same file as the one created by the test File file = new File(getBlockFile(blockId)); assertEquals(file.getName(), FsDatasetTestUtil.getFile(fds, bpid, blockId).getName()); // Generation stamp is same as that of created file assertEquals(genStamp, replicainfo.getGenerationStamp()); // File size matches assertEquals(size, replicainfo.getNumBytes()); }
/** Truncate a block file */ private long truncateBlockFile() throws IOException { synchronized (fds) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); // Truncate a block file that has a corresponding metadata file if (f.exists() && f.length() != 0 && mf.exists()) { FileOutputStream s = null; FileChannel channel = null; try { s = new FileOutputStream(f); channel = s.getChannel(); channel.truncate(0); LOG.info("Truncated block file " + f.getAbsolutePath()); return b.getBlockId(); } finally { IOUtils.cleanup(LOG, channel, s); } } } } return 0; }
/** Truncate a block file */ private long truncateBlockFile() throws IOException { synchronized (fds) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); // Truncate a block file that has a corresponding metadata file if (f.exists() && f.length() != 0 && mf.exists()) { FileOutputStream s = new FileOutputStream(f); FileChannel channel = s.getChannel(); channel.truncate(0); LOG.info("Truncated block file " + f.getAbsolutePath()); return b.getBlockId(); } } } return 0; }
/** * Truncate a block file */ private long truncateBlockFile() throws IOException { synchronized (fds) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); // Truncate a block file that has a corresponding metadata file if (f.exists() && f.length() != 0 && mf.exists()) { FileOutputStream s = new FileOutputStream(f); FileChannel channel = s.getChannel(); channel.truncate(0); LOG.info("Truncated block file " + f.getAbsolutePath()); return b.getBlockId(); } } } return 0; }
private static ReplicaInPipeline getReplica(final DataNode datanode, final String bpid, final ReplicaState expectedState) throws InterruptedException { final Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas( datanode.getFSDataset(), bpid); for(int i = 0; i < 5 && replicas.size() == 0; i++) { LOG.info("wait since replicas.size() == 0; i=" + i); Thread.sleep(1000); } Assert.assertEquals(1, replicas.size()); final ReplicaInfo r = replicas.iterator().next(); Assert.assertEquals(expectedState, r.getState()); return (ReplicaInPipeline)r; }
/** * Asserts that the storage lock file in each given directory has been * released. This method works by trying to acquire the lock file itself. If * locking fails here, then the main code must have failed to release it. * * @param dirs every storage directory to check * @throws IOException if there is an unexpected I/O error */ private static void assertFileLocksReleased(Collection<String> dirs) throws IOException { for (String dir: dirs) { try { FsDatasetTestUtil.assertFileLockReleased(dir); } catch (IOException e) { LOG.warn(e); } } }
/** Delete a block file */ private long deleteBlockFile() { synchronized(fds) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); // Delete a block file that has corresponding metadata file if (f.exists() && mf.exists() && f.delete()) { LOG.info("Deleting block file " + f.getAbsolutePath()); return b.getBlockId(); } } } return 0; }
/** Delete block meta file */ private long deleteMetaFile() { synchronized(fds) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File file = b.getMetaFile(); // Delete a metadata file if (file.exists() && file.delete()) { LOG.info("Deleting metadata file " + file.getAbsolutePath()); return b.getBlockId(); } } } return 0; }
/** * Duplicate the given block on all volumes. * @param blockId * @throws IOException */ private void duplicateBlock(long blockId) throws IOException { synchronized (fds) { ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); for (FsVolumeSpi v : fds.getVolumes()) { if (v.getStorageID().equals(b.getVolume().getStorageID())) { continue; } // Volume without a copy of the block. Make a copy now. File sourceBlock = b.getBlockFile(); File sourceMeta = b.getMetaFile(); String sourceRoot = b.getVolume().getBasePath(); String destRoot = v.getBasePath(); String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath(); String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath(); File destBlock = new File(destRoot, relativeBlockPath); File destMeta = new File(destRoot, relativeMetaPath); destBlock.getParentFile().mkdirs(); FileUtils.copyFile(sourceBlock, destBlock); FileUtils.copyFile(sourceMeta, destMeta); if (destBlock.exists() && destMeta.exists()) { LOG.info("Copied " + sourceBlock + " ==> " + destBlock); LOG.info("Copied " + sourceMeta + " ==> " + destMeta); } } } }
/** Get a random blockId that is not used already */ private long getFreeBlockId() { long id = rand.nextLong(); while (true) { id = rand.nextLong(); if (FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, id) == null) { break; } } return id; }
@Test (timeout=300000) public void testRetainBlockOnPersistentStorage() throws Exception { cluster = new MiniDFSCluster .Builder(CONF) .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) .numDataNodes(1) .build(); try { cluster.waitActive(); DataNode dataNode = cluster.getDataNodes().get(0); bpid = cluster.getNamesystem().getBlockPoolId(); fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); client = cluster.getFileSystem().getClient(); scanner = new DirectoryScanner(dataNode, fds, CONF); scanner.setRetainDiffs(true); FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); // Add a file with 1 block List<LocatedBlock> blocks = createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false); // Ensure no difference between volumeMap and disk. scan(1, 0, 0, 0, 0, 0); // Make a copy of the block on RAM_DISK and ensure that it is // picked up by the scanner. duplicateBlock(blocks.get(0).getBlock().getBlockId()); scan(2, 1, 0, 0, 0, 0, 1); verifyStorageType(blocks.get(0).getBlock().getBlockId(), false); scan(1, 0, 0, 0, 0, 0); } finally { if (scanner != null) { scanner.shutdown(); scanner = null; } cluster.shutdown(); cluster = null; } }
/** * Duplicate the given block on all volumes. * @param blockId * @throws IOException */ private void duplicateBlock(long blockId) throws IOException { synchronized (fds) { ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { for (FsVolumeSpi v : volumes) { if (v.getStorageID().equals(b.getVolume().getStorageID())) { continue; } // Volume without a copy of the block. Make a copy now. File sourceBlock = b.getBlockFile(); File sourceMeta = b.getMetaFile(); String sourceRoot = b.getVolume().getBasePath(); String destRoot = v.getBasePath(); String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()) .getPath(); String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()) .getPath(); File destBlock = new File(destRoot, relativeBlockPath); File destMeta = new File(destRoot, relativeMetaPath); destBlock.getParentFile().mkdirs(); FileUtils.copyFile(sourceBlock, destBlock); FileUtils.copyFile(sourceMeta, destMeta); if (destBlock.exists() && destMeta.exists()) { LOG.info("Copied " + sourceBlock + " ==> " + destBlock); LOG.info("Copied " + sourceMeta + " ==> " + destMeta); } } } } }