FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf, StorageType storageType) throws IOException { this.dataset = dataset; this.storageID = storageID; this.reserved = conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); this.storageType = storageType; final int maxNumThreads = dataset.datanode.getConf().getInt( DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT ); ThreadFactory workerFactory = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d") .build(); cacheExecutor = new ThreadPoolExecutor( 1, maxNumThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), workerFactory); cacheExecutor.allowCoreThreadTimeOut(true); }
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf, StorageType storageType) throws IOException { this.dataset = dataset; this.storageID = storageID; this.reserved = conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); this.reservedForRbw = new AtomicLong(0L); this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); this.storageType = storageType; this.configuredCapacity = -1; cacheExecutor = initializeCacheExecutor(parent); }
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf, StorageType storageType) throws IOException { this.dataset = dataset; this.storageID = storageID; this.reserved = conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); this.reservedForReplicas = new AtomicLong(0L); this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); this.storageType = storageType; this.configuredCapacity = -1; cacheExecutor = initializeCacheExecutor(parent); }
@Override public long getRawCapacity() throws IOException { try (FsVolumeReferences volRefs = dataset.getFsVolumeReferences()) { Preconditions.checkState(volRefs.size() != 0); DF df = new DF(new File(volRefs.get(0).getBasePath()), dataset.datanode.getConf()); return df.getCapacity(); } }
public void testVolumeSizeWithBytes() throws Exception { Configuration conf = new Configuration(); File data_dir = MiniDFSCluster.getDataDirectory(conf); // Need to create data_dir, otherwise DF doesn't work on non-existent dir. data_dir.mkdirs(); DF df = new DF(data_dir, conf); long reserved = 10000; conf.setLong("dfs.datanode.du.reserved", reserved); verifyVolumeSize(conf, reserved, df); }
public void testVolumeSizeWithPercent() throws Exception { Configuration conf = new Configuration(); File data_dir = MiniDFSCluster.getDataDirectory(conf); // Need to create data_dir, otherwise DF doesn't work on non-existent dir. data_dir.mkdirs(); DF df = new DF(data_dir, conf); long reserved = (long) (df.getCapacity() * 0.215); conf.setFloat("dfs.datanode.du.reserved.percent", 21.5f); verifyVolumeSize(conf, reserved, df); }
FSVolume(FSDataset dataset, File currentDir, Configuration conf) throws IOException { this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); this.reserved = usage.getReserved(); this.dataset = dataset; this.namespaceMap = new NamespaceMap(); this.dfsUsage = new DU(currentDir, conf); this.dfsUsage.start(); this.nativeIOExecutor = Executors.newSingleThreadExecutor(); }
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf) throws IOException { this.dataset = dataset; this.storageID = storageID; this.reserved = conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); }
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf) throws IOException { this.dataset = dataset; this.storageID = storageID; this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); }
FSVolume(File currentDir, Configuration conf) throws IOException { this.reserved = conf.getLong("dfs.datanode.du.reserved", 0); this.currentDir = currentDir; File parent = currentDir.getParentFile(); final File finalizedDir = new File( currentDir, DataStorage.STORAGE_DIR_FINALIZED); // Files that were being written when the datanode was last shutdown // are now moved back to the data directory. It is possible that // in the future, we might want to do some sort of datanode-local // recovery for these blocks. For example, crc validation. // this.tmpDir = new File(parent, "tmp"); if (tmpDir.exists()) { FileUtil.fullyDelete(tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); if (rbwDir.exists() && !supportAppends) { FileUtil.fullyDelete(rbwDir); } this.dataDir = new FSDir(finalizedDir); if (!rbwDir.mkdirs()) { // create rbw directory if not exist if (!rbwDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + rbwDir.toString()); } } if (!tmpDir.mkdirs()) { if (!tmpDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } } this.usage = new DF(parent, conf); this.dfsUsage = new DU(parent, conf); this.dfsUsage.start(); }
FSVolume(FSDataset dataset, File currentDir, Configuration conf) throws IOException { this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); this.reserved = usage.getReserved(); this.dataset = dataset; this.namespaceMap = new NamespaceMap(); this.dfsUsage = new DU(currentDir, conf); this.dfsUsage.start(); }
public CheckedVolume(File dirToCheck, boolean required) throws IOException { df = new DF(dirToCheck, conf); this.required = required; volume = df.getFilesystem(); }
FSVolume(File currentDir, Configuration conf) throws IOException { this.reserved = conf.getLong("dfs.datanode.du.reserved", 0); this.dataDir = new FSDir(currentDir); this.currentDir = currentDir; boolean supportAppends = conf.getBoolean("dfs.support.append", false); File parent = currentDir.getParentFile(); this.detachDir = new File(parent, "detach"); if (detachDir.exists()) { recoverDetachedBlocks(currentDir, detachDir); } // remove all blocks from "tmp" directory. These were either created // by pre-append clients (0.18.x) or are part of replication request. // They can be safely removed. this.tmpDir = new File(parent, "tmp"); if (tmpDir.exists()) { FileUtil.fullyDelete(tmpDir); } // Files that were being written when the datanode was last shutdown // should not be deleted. blocksBeingWritten = new File(parent, "blocksBeingWritten"); if (blocksBeingWritten.exists()) { if (supportAppends) { recoverBlocksBeingWritten(blocksBeingWritten); } else { FileUtil.fullyDelete(blocksBeingWritten); } } if (!blocksBeingWritten.mkdirs()) { if (!blocksBeingWritten.isDirectory()) { throw new IOException("Mkdirs failed to create " + blocksBeingWritten.toString()); } } if (!tmpDir.mkdirs()) { if (!tmpDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } } if (!detachDir.mkdirs()) { if (!detachDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + detachDir.toString()); } } this.usage = new DF(parent, conf); this.dfsUsage = new DU(parent, conf); this.dfsUsage.start(); }