Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) { final long movedWinWidth = conf.getLong( DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT); final int moverThreads = conf.getInt( DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); final int maxConcurrentMovesPerNode = conf.getInt( DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); this.retryMaxAttempts = conf.getInt( DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT); this.retryCount = retryCount; this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(), Collections.<String> emptySet(), movedWinWidth, moverThreads, 0, maxConcurrentMovesPerNode, conf); this.storages = new StorageMap(); this.targetPaths = nnc.getTargetPaths(); this.blockStoragePolicies = new BlockStoragePolicy[1 << BlockStoragePolicySuite.ID_BIT_LENGTH]; }
/** * @return whether there is still remaining migration work for the next * round */ private boolean processNamespace() throws IOException { getSnapshottableDirs(); boolean hasRemaining = false; for (Path target : targetPaths) { hasRemaining |= processPath(target.toUri().getPath()); } // wait for pending move to finish and retry the failed migration boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets .values()); if (hasFailed) { if (retryCount.get() == retryMaxAttempts) { throw new IOException("Failed to move some block's after " + retryMaxAttempts + " retries."); } else { retryCount.incrementAndGet(); } } else { // Reset retry count if no failure. retryCount.set(0); } hasRemaining |= hasFailed; return hasRemaining; }
Mover(NameNodeConnector nnc, Configuration conf) { final long movedWinWidth = conf.getLong( DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT); final int moverThreads = conf.getInt( DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); final int maxConcurrentMovesPerNode = conf.getInt( DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(), Collections.<String> emptySet(), movedWinWidth, moverThreads, 0, maxConcurrentMovesPerNode, conf); this.storages = new StorageMap(); this.targetPaths = nnc.getTargetPaths(); this.blockStoragePolicies = new BlockStoragePolicy[1 << BlockStoragePolicySuite.ID_BIT_LENGTH]; }
/** * @return whether there is still remaining migration work for the next * round */ private Result processNamespace() throws IOException { getSnapshottableDirs(); Result result = new Result(); for (Path target : targetPaths) { processPath(target.toUri().getPath(), result); } // wait for pending move to finish and retry the failed migration boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets .values()); boolean hasSuccess = Dispatcher.checkForSuccess(storages.targets .values()); if (hasFailed && !hasSuccess) { if (retryCount.get() == retryMaxAttempts) { result.setRetryFailed(); LOG.error("Failed to move some block's after " + retryMaxAttempts + " retries."); return result; } else { retryCount.incrementAndGet(); } } else { // Reset retry count if no failure. retryCount.set(0); } result.updateHasRemaining(hasFailed); return result; }
/** * @return whether there is still remaining migration work for the next * round */ private boolean processNamespace() { getSnapshottableDirs(); boolean hasRemaining = false; for (Path target : targetPaths) { hasRemaining |= processPath(target.toUri().getPath()); } // wait for pending move to finish and retry the failed migration hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values()); return hasRemaining; }