Java 类org.apache.hadoop.hdfs.server.namenode.Namesystem 实例源码

项目:hadoop    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:aliyun-oss-hadoop-fs    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestHeartbeatHandling.java   
@Test
public void testHeartbeatStopWatch() throws Exception {
 Namesystem ns = Mockito.mock(Namesystem.class);
 BlockManager bm = Mockito.mock(BlockManager.class);
 Configuration conf = new Configuration();
 long recheck = 2000;
 conf.setLong(
     DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck);
 HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf);
 monitor.restartHeartbeatStopWatch();
 assertFalse(monitor.shouldAbortHeartbeatCheck(0));
 // sleep shorter than recheck and verify shouldn't abort
 Thread.sleep(100);
 assertFalse(monitor.shouldAbortHeartbeatCheck(0));
 // sleep longer than recheck and verify should abort unless ignore delay
 Thread.sleep(recheck);
 assertTrue(monitor.shouldAbortHeartbeatCheck(0));
 assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck*3));
 // ensure it resets properly
 monitor.restartHeartbeatStopWatch();
 assertFalse(monitor.shouldAbortHeartbeatCheck(0));
}
项目:big-c    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:hadoop-plus    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:FlexMap    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:hops    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
    final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval =
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
          DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval =
      conf.getLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info(
        "Setting heartbeat recheck interval to " + staleInterval + " since " +
            DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
            " is less than " +
            DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:hops    文件:NameNodeBlockTokenSecretManager.java   
/**
 * Constructor for masters.
 */
public NameNodeBlockTokenSecretManager(long keyUpdateInterval,
    long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
    Namesystem namesystem) throws IOException {
  super(true, keyUpdateInterval, tokenLifetime, blockPoolId,
      encryptionAlgorithm);
  this.namesystem = namesystem;
  this.setSerialNo(new SecureRandom().nextInt());
  if (isLeader()) {
    // TODO[Hooman]: Since Master is keeping the serialNo locally, so whenever
    // A namenode crashes it should remove all keys from the database.
    this.generateKeys();
  } else {
    retrieveBlockKeys();
  }
}
项目:hadoop-TCP    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:hardfs    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:hadoop-on-lustre2    文件:HeartbeatManager.java   
HeartbeatManager(final Namesystem namesystem,
    final BlockManager blockManager, final Configuration conf) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  boolean avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  long recheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
  long staleInterval = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s

  if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
    this.heartbeatRecheckInterval = staleInterval;
    LOG.info("Setting heartbeat recheck interval to " + staleInterval
        + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
        + " is less than "
        + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
  } else {
    this.heartbeatRecheckInterval = recheckInterval;
  }
}
项目:hadoop    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockManager, final HeartbeatManager hbManager) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  this.hbManager = hbManager;

  executor = Executors.newScheduledThreadPool(1,
      new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
          .setDaemon(true).build());
  decomNodeBlocks = new TreeMap<>();
  pendingNodes = new LinkedList<>();
}
项目:aliyun-oss-hadoop-fs    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockManager, final HeartbeatManager hbManager) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  this.hbManager = hbManager;

  executor = Executors.newScheduledThreadPool(1,
      new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
          .setDaemon(true).build());
  decomNodeBlocks = new TreeMap<>();
  pendingNodes = new LinkedList<>();
}
项目:aliyun-oss-hadoop-fs    文件:BlockManagerSafeMode.java   
BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem,
    Configuration conf) {
  this.blockManager = blockManager;
  this.namesystem = namesystem;
  this.haEnabled = namesystem.isHaEnabled();
  this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
      DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
  if (this.threshold > 1.0) {
    LOG.warn("The threshold value should't be greater than 1, threshold: {}",
        threshold);
  }
  this.datanodeThreshold = conf.getInt(
      DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
      DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
  int minReplication =
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
  // DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting,
  // setting this lower than the min replication is not recommended
  // and/or dangerous for production setups.
  // When it's unset, safeReplication will use dfs.namenode.replication.min
  this.safeReplication =
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY,
          minReplication);
  // default to safe mode threshold (i.e., don't populate queues before
  // leaving safe mode)
  this.replQueueThreshold =
      conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
          (float) threshold);

  this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);

  LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, threshold);
  LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
      datanodeThreshold);
  LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, extension);
}
项目:aliyun-oss-hadoop-fs    文件:TestReplicationPolicy.java   
@Test(timeout = 60000)
public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
    throws IOException {
  Namesystem mockNS = mock(Namesystem.class);
  when(mockNS.hasReadLock()).thenReturn(true);

  BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
  UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;

  BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
  BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());

  // Adding QUEUE_UNDER_REPLICATED block
  underReplicatedBlocks.add(block1, 0, 0, 1, 1);

  // Adding QUEUE_UNDER_REPLICATED block
  underReplicatedBlocks.add(block2, 0, 0, 1, 1);

  List<List<BlockInfo>> chosenBlocks;

  // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
  // from QUEUE_VERY_UNDER_REPLICATED.
  chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
  assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);

  bm.setReplication((short)0, (short)1, block1);

  // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
  // from QUEUE_VERY_UNDER_REPLICATED.
  // This block remains and should not be skipped over.
  chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
  assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
项目:big-c    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockManager, final HeartbeatManager hbManager) {
  this.namesystem = namesystem;
  this.blockManager = blockManager;
  this.hbManager = hbManager;

  executor = Executors.newScheduledThreadPool(1,
      new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
          .setDaemon(true).build());
  decomNodeBlocks = new TreeMap<>();
  pendingNodes = new LinkedList<>();
}
项目:aliyun-oss-hadoop-fs    文件:TestReplicationPolicy.java   
@Test(timeout = 60000)
public void
    testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
        throws IOException {
  Namesystem mockNS = mock(Namesystem.class);
  when(mockNS.hasWriteLock()).thenReturn(true);

  BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
  UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;

  long blkID1 = ThreadLocalRandom.current().nextLong();
  if (blkID1 < 0) {
    blkID1 *= -1;
  }
  long blkID2 = ThreadLocalRandom.current().nextLong();
  if (blkID2 < 0) {
    blkID2 *= -1;
  }

  BlockInfo block1 = genBlockInfo(blkID1);
  BlockInfo block2 = genBlockInfo(blkID2);

  // Adding QUEUE_UNDER_REPLICATED block
  underReplicatedBlocks.add(block1, 0, 0, 1, 1);

  // Adding QUEUE_UNDER_REPLICATED block
  underReplicatedBlocks.add(block2, 0, 0, 1, 1);

  List<List<BlockInfo>> chosenBlocks;

  // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
  // from QUEUE_VERY_UNDER_REPLICATED.
  chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
  assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);

  final BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
  final BlockCollection mbc = mock(BlockCollection.class);
  when(mbc.getId()).thenReturn(1000L);
  when(mbc.getLastBlock()).thenReturn(info);
  when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
  when(mbc.isUnderConstruction()).thenReturn(true);
  ContentSummary cs = mock(ContentSummary.class);
  when(cs.getLength()).thenReturn((long)1);
  when(mbc.computeContentSummary(bm.getStoragePolicySuite())).thenReturn(cs);
  info.setBlockCollectionId(1000);
  bm.addBlockCollection(info, mbc);

  DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
      dataNodes[0], new DatanodeStorage("s1"))};
  info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
      storageAry);
  DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class);
  DatanodeDescriptor dn = mock(DatanodeDescriptor.class);
  when(dn.isDecommissioned()).thenReturn(true);
  when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL);
  when(storage.getDatanodeDescriptor()).thenReturn(dn);
  when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
  when(storage.addBlock(any(BlockInfo.class))).thenReturn
      (DatanodeStorageInfo.AddBlockResult.ADDED);
  info.addStorage(storage, info);

  BlockInfo lastBlk = mbc.getLastBlock();
  when(mbc.getLastBlock()).thenReturn(lastBlk, info);

  bm.convertLastBlockToUnderConstruction(mbc, 0L);

  // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
  // from QUEUE_VERY_UNDER_REPLICATED.
  // This block remains and should not be skipped over.
  chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
  assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockmanager) {
  this.namesystem = namesystem;
  this.blockmanager = blockmanager;
}
项目:hadoop-plus    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockmanager) {
  this.namesystem = namesystem;
  this.blockmanager = blockmanager;
}
项目:hadoop-plus    文件:BlockManager.java   
public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
    final Configuration conf) throws IOException {
  this.namesystem = namesystem;
  datanodeManager = new DatanodeManager(this, namesystem, conf);
  heartbeatManager = datanodeManager.getHeartbeatManager();
  invalidateBlocks = new InvalidateBlocks(datanodeManager);

  // Compute the map capacity by allocating 2% of total memory
  blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
  blockplacement = BlockPlacementPolicy.getInstance(
      conf, stats, datanodeManager.getNetworkTopology());
  pendingReplications = new PendingReplicationBlocks(conf.getInt(
    DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
    DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);

  blockTokenSecretManager = createBlockTokenSecretManager(conf);

  this.maxCorruptFilesReturned = conf.getInt(
    DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
    DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
  this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
                                        DFSConfigKeys.DFS_REPLICATION_DEFAULT);

  final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
                               DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
  final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
                               DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
  if (minR <= 0)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
        + " = " + minR + " <= 0");
  if (maxR > Short.MAX_VALUE)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
        + " = " + maxR + " > " + Short.MAX_VALUE);
  if (minR > maxR)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
        + " = " + minR + " > "
        + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
        + " = " + maxR);
  this.minReplication = (short)minR;
  this.maxReplication = (short)maxR;

  this.maxReplicationStreams =
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
  this.replicationStreamsHardLimit =
      conf.getInt(
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
  this.shouldCheckForEnoughRacks =
      conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
          ? false : true;

  this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
  this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);

  this.replicationRecheckInterval = 
    conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;

  this.encryptDataTransfer =
      conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
          DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);

  LOG.info("defaultReplication         = " + defaultReplication);
  LOG.info("maxReplication             = " + maxReplication);
  LOG.info("minReplication             = " + minReplication);
  LOG.info("maxReplicationStreams      = " + maxReplicationStreams);
  LOG.info("shouldCheckForEnoughRacks  = " + shouldCheckForEnoughRacks);
  LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
  LOG.info("encryptDataTransfer        = " + encryptDataTransfer);
}
项目:hadoop-plus    文件:DatanodeManager.java   
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
    final Configuration conf) throws IOException {
  this.namesystem = namesystem;
  this.blockManager = blockManager;

  this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);

  networktopology = NetworkTopology.getInstance(conf);

  this.defaultXferPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
  this.defaultInfoPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
  this.defaultIpcPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
  try {
    this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
      conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
  } catch (IOException e) {
    LOG.error("error reading hosts files: ", e);
  }

  this.dnsToSwitchMapping = ReflectionUtils.newInstance(
      conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
          ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);

  // If the dns to switch mapping supports cache, resolve network
  // locations of those hosts in the include list and store the mapping
  // in the cache; so future calls to resolve will be fast.
  if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
    final ArrayList<String> locations = new ArrayList<String>();
    for (Entry entry : hostFileManager.getIncludes()) {
      if (!entry.getIpAddress().isEmpty()) {
        locations.add(entry.getIpAddress());
      }
    }
    dnsToSwitchMapping.resolve(locations);
  };

  final long heartbeatIntervalSeconds = conf.getLong(
      DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
      DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
  final int heartbeatRecheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
  this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
      + 10 * 1000 * heartbeatIntervalSeconds;
  final int blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
      DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
  this.blockInvalidateLimit = conf.getInt(
      DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
  LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
      + "=" + this.blockInvalidateLimit);

  this.avoidStaleDataNodesForRead = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
  this.avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
  this.ratioUseStaleDataNodesForWrite = conf.getFloat(
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
  Preconditions.checkArgument(
      (ratioUseStaleDataNodesForWrite > 0 && 
          ratioUseStaleDataNodesForWrite <= 1.0f),
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
      " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
      "It should be a positive non-zero float value, not greater than 1.0f.");
}
项目:FlexMap    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockmanager) {
  this.namesystem = namesystem;
  this.blockmanager = blockmanager;
}
项目:hops    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockmanager) {
  this.namesystem = namesystem;
  this.blockmanager = blockmanager;
}
项目:hops    文件:DatanodeManager.java   
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
    final Configuration conf) throws IOException {
  this.namesystem = namesystem;
  this.blockManager = blockManager;

  this.networktopology = NetworkTopology.getInstance(conf);

  this.heartbeatManager =
      new HeartbeatManager(namesystem, blockManager, conf);

  this.hostsReader =
      new HostsFileReader(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
          conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));

  this.dnsToSwitchMapping = ReflectionUtils.newInstance(
      conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
          ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);

  // If the dns to switch mapping supports cache, resolve network
  // locations of those hosts in the include list and store the mapping
  // in the cache; so future calls to resolve will be fast.
  if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
    dnsToSwitchMapping.resolve(new ArrayList<>(hostsReader.getHosts()));
  }

  final long heartbeatIntervalSeconds =
      conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
          DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
  final int heartbeatRecheckInterval =
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
          DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
  this.heartbeatExpireInterval =
      2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds;
  final int blockInvalidateLimit =
      Math.max(20 * (int) (heartbeatIntervalSeconds),
          DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
  this.blockInvalidateLimit =
      conf.getInt(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY,
          blockInvalidateLimit);
  LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" +
      this.blockInvalidateLimit);

  this.avoidStaleDataNodesForRead = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
  this.avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  this.staleInterval =
      getStaleIntervalFromConf(conf, heartbeatExpireInterval);
  this.ratioUseStaleDataNodesForWrite = conf.getFloat(
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
  Preconditions.checkArgument((ratioUseStaleDataNodesForWrite > 0 &&
          ratioUseStaleDataNodesForWrite <= 1.0f),
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
          " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
          "It should be a positive non-zero float value, not greater than 1.0f.");

  this.storageIdMap = new StorageIdMap();
}
项目:hadoop-TCP    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockmanager) {
  this.namesystem = namesystem;
  this.blockmanager = blockmanager;
}
项目:hadoop-TCP    文件:BlockManager.java   
public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
    final Configuration conf) throws IOException {
  this.namesystem = namesystem;
  datanodeManager = new DatanodeManager(this, namesystem, conf);
  heartbeatManager = datanodeManager.getHeartbeatManager();
  invalidateBlocks = new InvalidateBlocks(datanodeManager);

  // Compute the map capacity by allocating 2% of total memory
  blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
  blockplacement = BlockPlacementPolicy.getInstance(
      conf, stats, datanodeManager.getNetworkTopology());
  pendingReplications = new PendingReplicationBlocks(conf.getInt(
    DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
    DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);

  blockTokenSecretManager = createBlockTokenSecretManager(conf);

  this.maxCorruptFilesReturned = conf.getInt(
    DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
    DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
  this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
                                        DFSConfigKeys.DFS_REPLICATION_DEFAULT);

  final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
                               DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
  final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
                               DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
  if (minR <= 0)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
        + " = " + minR + " <= 0");
  if (maxR > Short.MAX_VALUE)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
        + " = " + maxR + " > " + Short.MAX_VALUE);
  if (minR > maxR)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
        + " = " + minR + " > "
        + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
        + " = " + maxR);
  this.minReplication = (short)minR;
  this.maxReplication = (short)maxR;

  this.maxReplicationStreams =
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
  this.replicationStreamsHardLimit =
      conf.getInt(
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
  this.shouldCheckForEnoughRacks =
      conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
          ? false : true;

  this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
  this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);

  this.replicationRecheckInterval = 
    conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;

  this.encryptDataTransfer =
      conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
          DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);

  LOG.info("defaultReplication         = " + defaultReplication);
  LOG.info("maxReplication             = " + maxReplication);
  LOG.info("minReplication             = " + minReplication);
  LOG.info("maxReplicationStreams      = " + maxReplicationStreams);
  LOG.info("shouldCheckForEnoughRacks  = " + shouldCheckForEnoughRacks);
  LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
  LOG.info("encryptDataTransfer        = " + encryptDataTransfer);
}
项目:hadoop-TCP    文件:DatanodeManager.java   
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
    final Configuration conf) throws IOException {
  this.namesystem = namesystem;
  this.blockManager = blockManager;

  this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);

  networktopology = NetworkTopology.getInstance(conf);

  this.defaultXferPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
  this.defaultInfoPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT)).getPort();
  this.defaultInfoSecurePort = NetUtils.createSocketAddr(
      conf.get(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY,
          DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
  this.defaultIpcPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
  try {
    this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
      conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
  } catch (IOException e) {
    LOG.error("error reading hosts files: ", e);
  }

  this.dnsToSwitchMapping = ReflectionUtils.newInstance(
      conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
          ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);

  // If the dns to switch mapping supports cache, resolve network
  // locations of those hosts in the include list and store the mapping
  // in the cache; so future calls to resolve will be fast.
  if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
    final ArrayList<String> locations = new ArrayList<String>();
    for (Entry entry : hostFileManager.getIncludes()) {
      if (!entry.getIpAddress().isEmpty()) {
        locations.add(entry.getIpAddress());
      }
    }
    dnsToSwitchMapping.resolve(locations);
  };

  final long heartbeatIntervalSeconds = conf.getLong(
      DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
      DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
  final int heartbeatRecheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
  this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
      + 10 * 1000 * heartbeatIntervalSeconds;
  final int blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
      DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
  this.blockInvalidateLimit = conf.getInt(
      DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
  LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
      + "=" + this.blockInvalidateLimit);

  this.avoidStaleDataNodesForRead = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
  this.avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
  this.ratioUseStaleDataNodesForWrite = conf.getFloat(
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
  Preconditions.checkArgument(
      (ratioUseStaleDataNodesForWrite > 0 && 
          ratioUseStaleDataNodesForWrite <= 1.0f),
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
      " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
      "It should be a positive non-zero float value, not greater than 1.0f.");
}
项目:hardfs    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockmanager) {
  this.namesystem = namesystem;
  this.blockmanager = blockmanager;
}
项目:hardfs    文件:BlockManager.java   
public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
    final Configuration conf) throws IOException {
  this.namesystem = namesystem;
  datanodeManager = new DatanodeManager(this, namesystem, conf);
  heartbeatManager = datanodeManager.getHeartbeatManager();
  invalidateBlocks = new InvalidateBlocks(datanodeManager);

  // Compute the map capacity by allocating 2% of total memory
  blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
  blockplacement = BlockPlacementPolicy.getInstance(
      conf, stats, datanodeManager.getNetworkTopology());
  pendingReplications = new PendingReplicationBlocks(conf.getInt(
    DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
    DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);

  blockTokenSecretManager = createBlockTokenSecretManager(conf);

  this.maxCorruptFilesReturned = conf.getInt(
    DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
    DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
  this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
                                        DFSConfigKeys.DFS_REPLICATION_DEFAULT);

  final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
                               DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
  final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
                               DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
  if (minR <= 0)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
        + " = " + minR + " <= 0");
  if (maxR > Short.MAX_VALUE)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
        + " = " + maxR + " > " + Short.MAX_VALUE);
  if (minR > maxR)
    throw new IOException("Unexpected configuration parameters: "
        + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
        + " = " + minR + " > "
        + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
        + " = " + maxR);
  this.minReplication = (short)minR;
  this.maxReplication = (short)maxR;

  this.maxReplicationStreams =
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
  this.replicationStreamsHardLimit =
      conf.getInt(
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
          DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
  this.shouldCheckForEnoughRacks =
      conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
          ? false : true;

  this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
  this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);

  this.replicationRecheckInterval = 
    conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;

  this.encryptDataTransfer =
      conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
          DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);

  LOG.info("defaultReplication         = " + defaultReplication);
  LOG.info("maxReplication             = " + maxReplication);
  LOG.info("minReplication             = " + minReplication);
  LOG.info("maxReplicationStreams      = " + maxReplicationStreams);
  LOG.info("shouldCheckForEnoughRacks  = " + shouldCheckForEnoughRacks);
  LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
  LOG.info("encryptDataTransfer        = " + encryptDataTransfer);
}
项目:hardfs    文件:DatanodeManager.java   
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
    final Configuration conf) throws IOException {
  this.namesystem = namesystem;
  this.blockManager = blockManager;

  this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);

  networktopology = NetworkTopology.getInstance(conf);

  this.defaultXferPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
  this.defaultInfoPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT)).getPort();
  this.defaultInfoSecurePort = NetUtils.createSocketAddr(
      conf.get(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY,
          DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
  this.defaultIpcPort = NetUtils.createSocketAddr(
        conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
            DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
  try {
    this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
      conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
  } catch (IOException e) {
    LOG.error("error reading hosts files: ", e);
  }

  this.dnsToSwitchMapping = ReflectionUtils.newInstance(
      conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
          ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);

  // If the dns to switch mapping supports cache, resolve network
  // locations of those hosts in the include list and store the mapping
  // in the cache; so future calls to resolve will be fast.
  if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
    final ArrayList<String> locations = new ArrayList<String>();
    for (Entry entry : hostFileManager.getIncludes()) {
      if (!entry.getIpAddress().isEmpty()) {
        locations.add(entry.getIpAddress());
      }
    }
    dnsToSwitchMapping.resolve(locations);
  };

  final long heartbeatIntervalSeconds = conf.getLong(
      DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
      DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
  final int heartbeatRecheckInterval = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
      DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
  this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
      + 10 * 1000 * heartbeatIntervalSeconds;
  final int blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
      DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
  this.blockInvalidateLimit = conf.getInt(
      DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
  LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
      + "=" + this.blockInvalidateLimit);

  this.avoidStaleDataNodesForRead = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
  this.avoidStaleDataNodesForWrite = conf.getBoolean(
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
      DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
  this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
  this.ratioUseStaleDataNodesForWrite = conf.getFloat(
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
  Preconditions.checkArgument(
      (ratioUseStaleDataNodesForWrite > 0 && 
          ratioUseStaleDataNodesForWrite <= 1.0f),
      DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
      " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
      "It should be a positive non-zero float value, not greater than 1.0f.");
}
项目:hadoop-on-lustre2    文件:DecommissionManager.java   
DecommissionManager(final Namesystem namesystem,
    final BlockManager blockmanager) {
  this.namesystem = namesystem;
  this.blockmanager = blockmanager;
}