/** Start the services common to active and standby states */ private void startCommonServices(Configuration conf) throws IOException { namesystem.startCommonServices(conf, haContext); registerNNSMXBean(); if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } rpcServer.start(); plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, ServicePlugin.class); for (ServicePlugin p: plugins) { try { p.start(this); } catch (Throwable t) { LOG.warn("ServicePlugin " + p + " could not be started", t); } } LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress()); if (rpcServer.getServiceRpcAddress() != null) { LOG.info(getRole() + " service RPC up at: " + rpcServer.getServiceRpcAddress()); } }
/** * Register a Backup name-node, verifying that it belongs * to the correct namespace, and adding it to the set of * active journals if necessary. * * @param bnReg registration of the new BackupNode * @param nnReg registration of this NameNode * @throws IOException if the namespace IDs do not match */ void registerBackupNode(NamenodeRegistration bnReg, NamenodeRegistration nnReg) throws IOException { writeLock(); try { if(getFSImage().getStorage().getNamespaceID() != bnReg.getNamespaceID()) throw new IOException("Incompatible namespaceIDs: " + " Namenode namespaceID = " + getFSImage().getStorage().getNamespaceID() + "; " + bnReg.getRole() + " node namespaceID = " + bnReg.getNamespaceID()); if (bnReg.getRole() == NamenodeRole.BACKUP) { getFSImage().getEditLog().registerBackupNode( bnReg, nnReg); } } finally { writeUnlock(); } }
@Override // NamenodeProtocol public NamenodeCommand startCheckpoint(NamenodeRegistration registration) throws IOException { checkNNStartup(); namesystem.checkSuperuserPrivilege(); verifyRequest(registration); if(!nn.isRole(NamenodeRole.NAMENODE)) throw new IOException("Only an ACTIVE node can invoke startCheckpoint."); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (NamenodeCommand) cacheEntry.getPayload(); } NamenodeCommand ret = null; try { ret = namesystem.startCheckpoint(registration, nn.setRegistration()); } finally { RetryCache.setState(cacheEntry, ret != null, ret); } return ret; }
/** * Create (or find if already exists) an edit output stream, which * streams journal records (edits) to the specified backup node.<br> * * The new BackupNode will start receiving edits the next time this * NameNode's logs roll. * * @param bnReg the backup node registration information. * @param nnReg this (active) name-node registration. * @throws IOException */ synchronized void registerBackupNode( NamenodeRegistration bnReg, // backup node NamenodeRegistration nnReg) // active name-node throws IOException { if(bnReg.isRole(NamenodeRole.CHECKPOINT)) return; // checkpoint node does not stream edits JournalManager jas = findBackupJournal(bnReg); if (jas != null) { // already registered LOG.info("Backup node " + bnReg + " re-registers"); return; } LOG.info("Registering new backup node: " + bnReg); BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg); synchronized(journalSetLock) { journalSet.add(bjm, false); } }
/** * Test that FSNamesystem#clear clears all leases. */ @Test public void testFSNamespaceClearLeases() throws Exception { Configuration conf = new HdfsConfiguration(); File nameDir = new File(MiniDFSCluster.getBaseDirectory(), "name"); conf.set(DFS_NAMENODE_NAME_DIR_KEY, nameDir.getAbsolutePath()); NameNode.initMetrics(conf, NamenodeRole.NAMENODE); DFSTestUtil.formatNameNode(conf); FSNamesystem fsn = FSNamesystem.loadFromDisk(conf); LeaseManager leaseMan = fsn.getLeaseManager(); leaseMan.addLease("client1", "importantFile"); assertEquals(1, leaseMan.countLease()); fsn.clear(); leaseMan = fsn.getLeaseManager(); assertEquals(0, leaseMan.countLease()); }
@Test public void testConvertNamenodeRegistration() { StorageInfo info = getStorageInfo(NodeType.NAME_NODE); NamenodeRegistration reg = new NamenodeRegistration("address:999", "http:1000", info, NamenodeRole.NAMENODE); NamenodeRegistrationProto regProto = PBHelper.convert(reg); NamenodeRegistration reg2 = PBHelper.convert(regProto); assertEquals(reg.getAddress(), reg2.getAddress()); assertEquals(reg.getClusterID(), reg2.getClusterID()); assertEquals(reg.getCTime(), reg2.getCTime()); assertEquals(reg.getHttpAddress(), reg2.getHttpAddress()); assertEquals(reg.getLayoutVersion(), reg2.getLayoutVersion()); assertEquals(reg.getNamespaceID(), reg2.getNamespaceID()); assertEquals(reg.getRegistrationID(), reg2.getRegistrationID()); assertEquals(reg.getRole(), reg2.getRole()); assertEquals(reg.getVersion(), reg2.getVersion()); }
/** * Test that FSNamesystem#clear clears all leases. */ @Test public void testFSNamespaceClearLeases() throws Exception { Configuration conf = new HdfsConfiguration(); File nameDir = new File(MiniDFSCluster.getBaseDirectory(), "name"); conf.set(DFS_NAMENODE_NAME_DIR_KEY, nameDir.getAbsolutePath()); NameNode.initMetrics(conf, NamenodeRole.NAMENODE); DFSTestUtil.formatNameNode(conf); FSNamesystem fsn = FSNamesystem.loadFromDisk(conf); LeaseManager leaseMan = fsn.getLeaseManager(); leaseMan.addLease("client1", fsn.getFSDirectory().allocateNewInodeId()); assertEquals(1, leaseMan.countLease()); fsn.clear(); leaseMan = fsn.getLeaseManager(); assertEquals(0, leaseMan.countLease()); }
/** * Set up the mock context. * * - extension is always needed (default period is {@link #EXTENSION} ms * - datanode threshold is always reached via mock * - safe block is 0 and it needs {@link #BLOCK_THRESHOLD} to reach threshold * - write/read lock is always held by current thread * * @throws IOException */ @Before public void setupMockCluster() throws IOException { Configuration conf = new HdfsConfiguration(); conf.setDouble(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, THRESHOLD); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, EXTENSION); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, DATANODE_NUM); FSNamesystem fsn = mock(FSNamesystem.class); Mockito.doReturn(true).when(fsn).hasWriteLock(); Mockito.doReturn(true).when(fsn).hasReadLock(); Mockito.doReturn(true).when(fsn).isRunning(); NameNode.initMetrics(conf, NamenodeRole.NAMENODE); bm = spy(new BlockManager(fsn, conf)); dn = spy(bm.getDatanodeManager()); Whitebox.setInternalState(bm, "datanodeManager", dn); // the datanode threshold is always met when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM); bmSafeMode = new BlockManagerSafeMode(bm, fsn, conf); }
/** * Initialize name-node. * * @param conf the configuration */ protected void initialize(Configuration conf) throws IOException { UserGroupInformation.setConfiguration(conf); loginAsNameNodeUser(conf); NameNode.initMetrics(conf, this.getRole()); if (NamenodeRole.NAMENODE == role) { startHttpServer(conf); validateConfigurationSettingsOrAbort(conf); } loadNamesystem(conf); rpcServer = createRpcServer(conf); if (NamenodeRole.NAMENODE == role) { httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } else { validateConfigurationSettingsOrAbort(conf); } startCommonServices(conf); }
/** Start the services common to active and standby states */ private void startCommonServices(Configuration conf) throws IOException { namesystem.startCommonServices(conf, haContext); if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } rpcServer.start(); plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, ServicePlugin.class); for (ServicePlugin p: plugins) { try { p.start(this); } catch (Throwable t) { LOG.warn("ServicePlugin " + p + " could not be started", t); } } LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress()); if (rpcServer.getServiceRpcAddress() != null) { LOG.info(getRole() + " service RPC up at: " + rpcServer.getServiceRpcAddress()); } }
/** * Create (or find if already exists) an edit output stream, which * streams journal records (edits) to the specified backup node.<br> * * The new BackupNode will start receiving edits the next time this * NameNode's logs roll. * * @param bnReg the backup node registration information. * @param nnReg this (active) name-node registration. * @throws IOException */ synchronized void registerBackupNode( NamenodeRegistration bnReg, // backup node NamenodeRegistration nnReg) // active name-node throws IOException { if(bnReg.isRole(NamenodeRole.CHECKPOINT)) return; // checkpoint node does not stream edits JournalManager jas = findBackupJournal(bnReg); if (jas != null) { // already registered LOG.info("Backup node " + bnReg + " re-registers"); return; } LOG.info("Registering new backup node: " + bnReg); BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg); journalSet.add(bjm, true); }
@Test public void testConvertNamenodeRegistration() { StorageInfo info = getStorageInfo(); NamenodeRegistration reg = new NamenodeRegistration("address:999", "http:1000", info, NamenodeRole.NAMENODE); NamenodeRegistrationProto regProto = PBHelper.convert(reg); NamenodeRegistration reg2 = PBHelper.convert(regProto); assertEquals(reg.getAddress(), reg2.getAddress()); assertEquals(reg.getClusterID(), reg2.getClusterID()); assertEquals(reg.getCTime(), reg2.getCTime()); assertEquals(reg.getHttpAddress(), reg2.getHttpAddress()); assertEquals(reg.getLayoutVersion(), reg2.getLayoutVersion()); assertEquals(reg.getNamespaceID(), reg2.getNamespaceID()); assertEquals(reg.getRegistrationID(), reg2.getRegistrationID()); assertEquals(reg.getRole(), reg2.getRole()); assertEquals(reg.getVersion(), reg2.getVersion()); }