/** * Utility method to fetch ZK auth info from the configuration */ public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf) throws Exception { String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); try { zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); if (zkAuthConf != null) { return ZKUtil.parseAuth(zkAuthConf); } else { return Collections.emptyList(); } } catch (Exception e) { LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH); throw e; } }
/** * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. * In the constructed {@link ACL}, all the users allowed by zkAcl are given * rwa access, while the current RM has exclude create-delete access. * * To be called only when HA is enabled and the configuration doesn't set ACL * for the root node. */ @VisibleForTesting @Private @Unstable protected List<ACL> constructZkRootNodeACL( Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException { List<ACL> zkRootNodeAcl = new ArrayList<ACL>(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), acl.getId())); } zkRootNodeUsername = HAUtil.getConfValueForRMInstance( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, conf); Id rmId = new Id(zkRootNodeAuthScheme, DigestAuthenticationProvider.generateDigest( zkRootNodeUsername + ":" + zkRootNodePassword)); zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); return zkRootNodeAcl; }
@Test(timeout = 20000) public void testInvalidZKAclConfiguration() { TestZKClient zkClientTester = new TestZKClient(); YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*"); try { zkClientTester.getRMStateStore(conf); fail("ZKRMStateStore created with bad ACL"); } catch (ZKUtil.BadAclFormatException bafe) { // expected behavior } catch (Exception e) { String error = "Incorrect exception on BadAclFormat"; LOG.error(error, e); fail(error); } }
/** * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. * In the constructed {@link ACL}, all the users allowed by zkAcl are given * rwa access, while the current RM has exclude create-delete access. * * To be called only when HA is enabled and the configuration doesn't set ACL * for the root node. */ @VisibleForTesting @Private @Unstable protected List<ACL> constructZkRootNodeACL( Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException { List<ACL> zkRootNodeAcl = new ArrayList<>(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), acl.getId())); } zkRootNodeUsername = HAUtil.getConfValueForRMInstance( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, conf); Id rmId = new Id(zkRootNodeAuthScheme, DigestAuthenticationProvider.generateDigest( zkRootNodeUsername + ":" + zkRootNodePassword)); zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); return zkRootNodeAcl; }
private void createConnection() throws Exception { // Curator connection CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); builder = builder.connectString(zkHostPort) .connectionTimeoutMs(zkSessionTimeout) .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval)); // Set up authorization based on fencing scheme List<AuthInfo> authInfos = new ArrayList<>(); for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth())); } if (useDefaultFencingScheme) { byte[] defaultFencingAuth = (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes( Charset.forName("UTF-8")); authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth)); } builder = builder.authorization(authInfos); // Connect to ZK curatorFramework = builder.build(); curatorFramework.start(); }
/** * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. * In the constructed {@link ACL}, all the users allowed by zkAcl are given * rwa access, while the current RM has exclude create-delete access. * * To be called only when HA is enabled and the configuration doesn't set ACL * for the root node. */ @VisibleForTesting @Private @Unstable protected List<ACL> constructZkRootNodeACL( Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException { List<ACL> zkRootNodeAcl = new ArrayList<>(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), acl.getId())); } zkRootNodeUsername = HAUtil.getConfValueForRMInstance( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, conf); Id rmId = new Id(zkRootNodeAuthScheme, DigestAuthenticationProvider.generateDigest( zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword())); zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); return zkRootNodeAcl; }
@Test public void testConfIndirection() throws IOException { assertNull(ZKUtil.resolveConfIndirection(null)); assertEquals("x", ZKUtil.resolveConfIndirection("x")); TEST_FILE.getParentFile().mkdirs(); Files.write("hello world", TEST_FILE, Charsets.UTF_8); assertEquals("hello world", ZKUtil.resolveConfIndirection( "@" + TEST_FILE.getAbsolutePath())); try { ZKUtil.resolveConfIndirection("@" + BOGUS_FILE); fail("Did not throw for non-existent file reference"); } catch (FileNotFoundException fnfe) { assertTrue(fnfe.getMessage().startsWith(BOGUS_FILE)); } }
private void initZK() throws HadoopIllegalArgumentException, IOException, KeeperException { zkQuorum = conf.get(ZK_QUORUM_KEY); int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); // Parse ACLs from configuration. String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT); zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf); if (zkAcls.isEmpty()) { zkAcls = Ids.CREATOR_ALL_ACL; } // Parse authentication from configuration. String zkAuthConf = conf.get(ZK_AUTH_KEY); zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); List<ZKAuthInfo> zkAuths; if (zkAuthConf != null) { zkAuths = ZKUtil.parseAuth(zkAuthConf); } else { zkAuths = Collections.emptyList(); } // Sanity check configuration. Preconditions.checkArgument(zkQuorum != null, "Missing required configuration '%s' for ZooKeeper quorum", ZK_QUORUM_KEY); Preconditions.checkArgument(zkTimeout > 0, "Invalid ZK session timeout %s", zkTimeout); int maxRetryNum = conf.getInt( CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY, CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT); elector = new ActiveStandbyElector(zkQuorum, zkTimeout, getParentZnode(), zkAcls, zkAuths, new ElectorCallbacks(), maxRetryNum); }
@Override protected void serviceInit(Configuration conf) throws Exception { conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf); String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS); if (zkQuorum == null) { throw new YarnRuntimeException("Embedded automatic failover " + "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS + " is not set"); } String rmId = HAUtil.getRMHAId(conf); String clusterId = YarnConfiguration.getClusterId(conf); localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId); String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); List<ACL> zkAcls = RMZKUtils.getZKAcls(conf); List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf); int maxRetryNum = conf.getInt( CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY, CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT); elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, electionZNode, zkAcls, zkAuths, this, maxRetryNum); elector.ensureParentZNode(); if (!isParentZnodeSafe(clusterId)) { notifyFatalError(electionZNode + " znode has invalid data! "+ "Might need formatting!"); } super.serviceInit(conf); }
/** * Utility method to fetch the ZK ACLs from the configuration */ public static List<ACL> getZKAcls(Configuration conf) throws Exception { // Parse authentication from configuration. String zkAclConf = conf.get(YarnConfiguration.RM_ZK_ACL, YarnConfiguration.DEFAULT_RM_ZK_ACL); try { zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); return ZKUtil.parseACLs(zkAclConf); } catch (Exception e) { LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); throw e; } }
private synchronized void createConnection() throws IOException, InterruptedException { closeZkClients(); for (int retries = 0; retries < numRetries && zkClient == null; retries++) { try { activeZkClient = getNewZooKeeper(); zkClient = activeZkClient; for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); } if (useDefaultFencingScheme) { zkClient.addAuthInfo(zkRootNodeAuthScheme, (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8"))); } } catch (IOException ioe) { // Retry in case of network failures LOG.info("Failed to connect to the ZooKeeper on attempt - " + (retries + 1)); ioe.printStackTrace(); } } if (zkClient == null) { LOG.error("Unable to connect to Zookeeper"); throw new YarnRuntimeException("Unable to connect to Zookeeper"); } ZKRMStateStore.this.notifyAll(); LOG.info("Created new ZK connection"); }
/** * Parse an ACL list. This includes configuration indirection * {@link ZKUtil#resolveConfIndirection(String)} * @param zkAclConf configuration string * @return an ACL list * @throws IOException on a bad ACL parse */ public List<ACL> parseACLs(String zkAclConf) throws IOException { try { return ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf)); } catch (ZKUtil.BadAclFormatException e) { throw new IOException("Parsing " + zkAclConf + " :" + e, e); } }
@Override protected void serviceInit(Configuration conf) throws Exception { conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf); String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS); if (zkQuorum == null) { throw new YarnRuntimeException("Embedded automatic failover " + "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS + " is not set"); } String rmId = HAUtil.getRMHAId(conf); String clusterId = YarnConfiguration.getClusterId(conf); localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId); String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); List<ACL> zkAcls = RMZKUtils.getZKAcls(conf); List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf); elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, electionZNode, zkAcls, zkAuths, this); elector.ensureParentZNode(); if (!isParentZnodeSafe(clusterId)) { notifyFatalError(electionZNode + " znode has invalid data! "+ "Might need formatting!"); } super.serviceInit(conf); }
private synchronized void createConnection() throws IOException, InterruptedException { closeZkClients(); for (int retries = 0; retries < numRetries && zkClient == null; retries++) { try { activeZkClient = getNewZooKeeper(); zkClient = activeZkClient; for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); } if (useDefaultFencingScheme) { zkClient.addAuthInfo(zkRootNodeAuthScheme, (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes()); } } catch (IOException ioe) { // Retry in case of network failures LOG.info("Failed to connect to the ZooKeeper on attempt - " + (retries + 1)); ioe.printStackTrace(); } } if (zkClient == null) { LOG.error("Unable to connect to Zookeeper"); throw new YarnRuntimeException("Unable to connect to Zookeeper"); } ZKRMStateStore.this.notifyAll(); LOG.info("Created new ZK connection"); }
private void initZK() throws HadoopIllegalArgumentException, IOException, KeeperException { zkQuorum = conf.get(ZK_QUORUM_KEY); int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); // Parse ACLs from configuration. String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT); zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf); if (zkAcls.isEmpty()) { zkAcls = Ids.CREATOR_ALL_ACL; } // Parse authentication from configuration. String zkAuthConf = conf.get(ZK_AUTH_KEY); zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); List<ZKAuthInfo> zkAuths; if (zkAuthConf != null) { zkAuths = ZKUtil.parseAuth(zkAuthConf); } else { zkAuths = Collections.emptyList(); } // Sanity check configuration. Preconditions.checkArgument(zkQuorum != null, "Missing required configuration '%s' for ZooKeeper quorum", ZK_QUORUM_KEY); Preconditions.checkArgument(zkTimeout > 0, "Invalid ZK session timeout %s", zkTimeout); elector = new ActiveStandbyElector(zkQuorum, zkTimeout, getParentZnode(), zkAcls, zkAuths, new ElectorCallbacks()); }