@BeforeClass public static void setupStatic() throws Exception { oldAuthProvider = System.setProperty("zookeeper.authProvider.1","org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); File tmpDir = createTmpDir(); File saslConfFile = new File(tmpDir, "jaas.conf"); FileWriter fwriter = new FileWriter(saslConfFile); fwriter.write("" + "Server {\n" + " org.apache.zookeeper.server.auth.DigestLoginModule required\n" + " user_super_duper=\"test\";\n" + "};\n" + "Client {\n" + " org.apache.zookeeper.server.auth.DigestLoginModule required\n" + " username=\"super_duper\"\n" + " password=\"test\";\n" + "};" + "\n"); fwriter.close(); oldLoginConfig = System.setProperty("java.security.auth.login.config",saslConfFile.getAbsolutePath()); oldSuperUser = System.setProperty("zookeeper.superUser","super_duper"); otherDigestUser = new Id ("digest", DigestAuthenticationProvider.generateDigest("jack:jack")); }
/** * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. * In the constructed {@link ACL}, all the users allowed by zkAcl are given * rwa access, while the current RM has exclude create-delete access. * * To be called only when HA is enabled and the configuration doesn't set ACL * for the root node. */ @VisibleForTesting @Private @Unstable protected List<ACL> constructZkRootNodeACL( Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException { List<ACL> zkRootNodeAcl = new ArrayList<ACL>(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), acl.getId())); } zkRootNodeUsername = HAUtil.getConfValueForRMInstance( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, conf); Id rmId = new Id(zkRootNodeAuthScheme, DigestAuthenticationProvider.generateDigest( zkRootNodeUsername + ":" + zkRootNodePassword)); zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); return zkRootNodeAcl; }
/** * Generate a base-64 encoded digest of the idPasswordPair pair * @param idPasswordPair id:password * @return a string that can be used for authentication */ public String digest(String idPasswordPair) throws IOException { if (StringUtils.isEmpty(idPasswordPair) || !isValid(idPasswordPair)) { throw new IOException("Invalid id:password: " + idPasswordPair); } try { return DigestAuthenticationProvider.generateDigest(idPasswordPair); } catch (NoSuchAlgorithmException e) { // unlikely since it is standard to the JVM, but maybe JCE restrictions // could trigger it throw new IOException(e.toString(), e); } }
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception { zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString .toString()), Integer.parseInt(this.properties .getProperty(keys.zkSessionTimeout.toString())), new Watcher() { public void process(WatchedEvent event) { sessionEvent(connectionLatch, event); } }); String authString = this.properties.getProperty(keys.userName.toString()) + ":"+ this.properties.getProperty(keys.password.toString()); this.isCheckParentPath = Boolean.parseBoolean(this.properties.getProperty(keys.isCheckParentPath.toString(),"true")); zk.addAuthInfo("digest", authString.getBytes()); acl.clear(); acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString)))); acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE)); }
/** * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. * In the constructed {@link ACL}, all the users allowed by zkAcl are given * rwa access, while the current RM has exclude create-delete access. * * To be called only when HA is enabled and the configuration doesn't set ACL * for the root node. */ @VisibleForTesting @Private @Unstable protected List<ACL> constructZkRootNodeACL( Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException { List<ACL> zkRootNodeAcl = new ArrayList<>(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), acl.getId())); } zkRootNodeUsername = HAUtil.getConfValueForRMInstance( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, conf); Id rmId = new Id(zkRootNodeAuthScheme, DigestAuthenticationProvider.generateDigest( zkRootNodeUsername + ":" + zkRootNodePassword)); zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); return zkRootNodeAcl; }
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception { zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString .toString()), Integer.parseInt(this.properties .getProperty(keys.zkSessionTimeout.toString())), new Watcher() { public void process(WatchedEvent event) { sessionEvent(connectionLatch, event); } }); String authString = this.properties.getProperty(keys.userName.toString()) + ":"+ this.properties.getProperty(keys.password.toString()); zk.addAuthInfo("digest", authString.getBytes()); acl.clear(); acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString)))); acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE)); }
/** * * @return */ public List<ACL> getCreateNodeAcls() { List<ACL> listAcls = new ArrayList<ACL>(3); try { Id id = new Id(PropertiesDynLoading.authScheme, DigestAuthenticationProvider.generateDigest(PropertiesDynLoading.accessKey)); ACL acl = new ACL(Perms.CREATE, id); listAcls.add(acl); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); return Ids.OPEN_ACL_UNSAFE; } return listAcls; }
public static CuratorFramework create() { RetryNTimes retryPolicy = new RetryNTimes(5, 5000); String authString = Constants.ZK_USER_NAME + ":" + Constants.ZK_PASSWORD; CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constants.ZK_CONNECT_STRING) .retryPolicy(retryPolicy) .connectionTimeoutMs(Constants.ZOO_KEEPER_TIMEOUT) .sessionTimeoutMs(Constants.ZOO_KEEPER_TIMEOUT * 3) .authorization("digest", authString.getBytes()).build(); try { acl.clear(); acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString)))); acl.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE)); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); LOGGER.error("ZKUtil-->>create() error,", e); } return client; }
/** * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. * In the constructed {@link ACL}, all the users allowed by zkAcl are given * rwa access, while the current RM has exclude create-delete access. * * To be called only when HA is enabled and the configuration doesn't set ACL * for the root node. */ @VisibleForTesting @Private @Unstable protected List<ACL> constructZkRootNodeACL( Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException { List<ACL> zkRootNodeAcl = new ArrayList<>(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), acl.getId())); } zkRootNodeUsername = HAUtil.getConfValueForRMInstance( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, conf); Id rmId = new Id(zkRootNodeAuthScheme, DigestAuthenticationProvider.generateDigest( zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword())); zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); return zkRootNodeAcl; }
@Test public void testAcl() throws NoSuchAlgorithmException { List<ACL> aclList = Collections.singletonList(new ACL(ZooDefs.Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("test:test")))); ModelSpec<TestModel> aclModelSpec = ModelSpec.builder(modelSpec.path(), modelSpec.serializer()).withAclList(aclList).build(); ModeledFramework<TestModel> client = ModeledFramework.wrap(async, aclModelSpec); complete(client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101)))); complete(client.update(new TestModel("John", "Galt", "Galt's Gulch", 54, BigInteger.valueOf(88))), (__, e) -> Assert.assertNotNull(e, "Should've gotten an auth failure")); try (CuratorFramework authCurator = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).authorization("digest", "test:test".getBytes()).build()) { authCurator.start(); ModeledFramework<TestModel> authClient = ModeledFramework.wrap(AsyncCuratorFramework.wrap(authCurator), aclModelSpec); complete(authClient.update(new TestModel("John", "Galt", "Galt's Gulch", 42, BigInteger.valueOf(66))), (__, e) -> Assert.assertNull(e, "Should've succeeded")); } }
/** * 获取用户名密码加密字符串 * @param username 用户名 * @param password 密码 * @return */ public static String getDigest(String username,String password){ if(StringUtils.isEmpty(username)||StringUtils.isEmpty(password)){ return null; } try { return DigestAuthenticationProvider.generateDigest(username+":"+password); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } return null; }
@Test public void testCreateTask() throws Exception { ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null); List<ACL> acls = new ArrayList<ACL>(); zk.addAuthInfo("digest", "ScheduleAdmin:password".getBytes()); acls.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("ScheduleAdmin:password")))); acls.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE)); zk.create("/uncode/schedule/task/taskObj#print", new byte[0], acls, CreateMode.PERSISTENT); zk.getData("/uncode/schedule/task/taskObj#print", false, null); }
public List<ACL> getAdminAcls() { List<ACL> listAcls = new ArrayList<ACL>(3); try { Id id = new Id(PropertiesDynLoading.authScheme, DigestAuthenticationProvider.generateDigest(PropertiesDynLoading.accessKey)); ACL acl = new ACL(Perms.ALL, id); listAcls.add(acl); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); return Ids.OPEN_ACL_UNSAFE; } return listAcls; }
/** * 授权访问 * Zookeeper对权限的控制是节点级别的,而且不继承,即对父节点设置权限,其子节点不继承父节点的权限 * Zookeeper提供了几种认证方式 * world:有个单一的ID,anyone,表示任何人。 * auth:不使用任何ID,表示任何通过验证的用户(是通过ZK验证的用户?连接到此ZK服务器的用户?)。 * digest:使用 用户名:密码 字符串生成MD5哈希值作为ACL标识符ID。权限的验证通过直接发送用户名密码字符串的方式完成, * ip:使用客户端主机ip地址作为一个ACL标识符,ACL表达式是以 addr/bits 这种格式表示的。ZK服务器会将addr的前bits位与客户端地址的前bits位来进行匹配验证权限。 * @param zooKeeper */ private static List<ACL> getACL(ZkClient zooKeeper) throws Exception { // 配置两个用户admin有读写权限,gao有读的权限 String userOne = "admin:admin"; String userTwo = "gao:gao"; // zooKeeper.addAuthInfo("digest",userOne.getBytes("UTF-8")); Id idOne = new Id("digest", DigestAuthenticationProvider.generateDigest(userOne)); Id idTwo = new Id("digest", DigestAuthenticationProvider.generateDigest(userTwo)); // 读 ACL acl = new ACL(ZooDefs.Perms.ALL, idOne); // 写 ACL aclRead = new ACL(ZooDefs.Perms.READ, idTwo); List<ACL> acls = Arrays.asList(acl, aclRead); return acls; }
public void init() throws Exception { LOG.info("ZC: init()"); client = new ZooKeeper(host + ":" + port, sessionTimeout, event -> watchers.forEach(w -> { try { w.process(event); } catch (Throwable t) { LOG.error(t.getMessage(), t); } })); client.addAuthInfo("digest", (uid + ":" + secret).getBytes()); secretDigest = DigestAuthenticationProvider.generateDigest(uid + ":" + secret); acls = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", secretDigest))); }
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception { zk = new ZooKeeper(zkConnectString, zkSessionTimeout, new Watcher() { public void process(WatchedEvent event) { sessionEvent(connectionLatch, event); } }); String authString = userName + ":" + password; zk.addAuthInfo("digest", authString.getBytes()); acl.clear(); acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString)))); acl.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE)); }
private void testZookeeperClientWithRootCreation() throws Exception { String digest = DigestAuthenticationProvider.generateDigest(String.format("%s:%s", USERNAME, PASSWORD)); List<ACL> acl = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("digest", digest))); ZookeeperClient zkClient = new ZookeeperClientBuilder(connectionString, USERNAME, PASSWORD, ROOT_DIR) .withRootCreation(acl).build(); zkClient.init(); zkClient.exists(ROOT_DIR); zkClient.destroy(); }
/** * Generate a base-64 encoded digest of the idPasswordPair pair * @param idPasswordPair id:password * @return a string that can be used for authentication */ public String digest(String idPasswordPair) throws IOException { if (StringUtils.isEmpty(idPasswordPair) || !isValid(idPasswordPair)) { throw new IOException("Invalid id:password"); } try { return DigestAuthenticationProvider.generateDigest(idPasswordPair); } catch (NoSuchAlgorithmException e) { // unlikely since it is standard to the JVM, but maybe JCE restrictions // could trigger it throw new IOException(e.toString(), e); } }
/** * 创建zk的acl,如果user和password都不为空,则构建只有user:password才有权限的ACL。 * 否则返回{@link org.apache.zookeeper.ZooDefs.Ids#OPEN_ACL_UNSAFE},即所有的人都有权限 * * @param user * @param password * @return * @throws NoSuchAlgorithmException */ public static List<ACL> createACL(String user, String password) throws NoSuchAlgorithmException { if (user != null && password != null) { String digest = DigestAuthenticationProvider.generateDigest(user + ":" + password); ACL all = new ACL(ZooDefs.Perms.ALL, new Id("digest", digest)); return Lists.newArrayList(all); } else { return ZooDefs.Ids.OPEN_ACL_UNSAFE; } }
@Test public void testACL() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).setTickTime(1000).build(); zkServer.startAndWait(); try { String userPass = "user:pass"; String digest = DigestAuthenticationProvider.generateDigest(userPass); // Creates two zkclients ZKClientService zkClient = ZKClientService.Builder .of(zkServer.getConnectionStr()) .addAuthInfo("digest", userPass.getBytes()) .build(); zkClient.startAndWait(); ZKClientService noAuthClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); noAuthClient.startAndWait(); // Create a node that is readable by all client, but admin for the creator String path = "/testacl"; zkClient.create(path, "test".getBytes(), CreateMode.PERSISTENT, ImmutableList.of( new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE), new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS) )).get(); // Verify the ACL ACLData aclData = zkClient.getACL(path).get(); Assert.assertEquals(2, aclData.getACL().size()); ACL acl = aclData.getACL().get(1); Assert.assertEquals(ZooDefs.Perms.ALL, acl.getPerms()); Assert.assertEquals("digest", acl.getId().getScheme()); Assert.assertEquals(digest, acl.getId().getId()); Assert.assertArrayEquals("test".getBytes(), noAuthClient.getData(path).get().getData()); // When tries to write using the no-auth zk client, it should fail. try { noAuthClient.setData(path, "test2".getBytes()).get(); Assert.fail(); } catch (ExecutionException e) { Assert.assertTrue(e.getCause() instanceof KeeperException.NoAuthException); } // Change ACL to make it open for all zkClient.setACL(path, ImmutableList.of(new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.ANYONE_ID_UNSAFE))).get(); // Write again with the non-auth client, now should succeed. noAuthClient.setData(path, "test2".getBytes()).get(); noAuthClient.stopAndWait(); zkClient.stopAndWait(); } finally { zkServer.stopAndWait(); } }
public CuratorFramework createAndStartCurator(Configuration conf) throws Exception { String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); if (zkHostPort == null) { throw new YarnRuntimeException( YarnConfiguration.RM_ZK_ADDRESS + " is not configured."); } int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES, YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES); int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS); // set up zk auths List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf); List<AuthInfo> authInfos = new ArrayList<>(); for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth())); } if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance( YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) { String zkRootNodeUsername = HAUtil .getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, conf); byte[] defaultFencingAuth = (zkRootNodeUsername + ":" + zkRootNodePassword) .getBytes(Charset.forName("UTF-8")); authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(), defaultFencingAuth)); } CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(zkHostPort) .sessionTimeoutMs(zkSessionTimeout) .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval)) .authorization(authInfos).build(); client.start(); return client; }
@Test public void testACL() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).setTickTime(1000).build(); zkServer.startAndWait(); try { String userPass = "user:pass"; String digest = DigestAuthenticationProvider.generateDigest(userPass); // Creates two zkclients ZKClientService zkClient = ZKClientService.Builder .of(zkServer.getConnectionStr()) .addAuthInfo("digest", userPass.getBytes()) .build(); zkClient.startAndWait(); ZKClientService noAuthClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); noAuthClient.startAndWait(); // Create a node that is readable by all client, but admin for the creator String path = "/testacl"; zkClient.create(path, "test".getBytes(), CreateMode.PERSISTENT, ImmutableList.of( new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE), new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS) )).get(); // Verify the ACL ACLData aclData = zkClient.getACL(path).get(); Assert.assertEquals(2, aclData.getACL().size()); ACL acl = aclData.getACL().get(1); Assert.assertEquals(ZooDefs.Perms.ALL, acl.getPerms()); Assert.assertEquals("digest", acl.getId().getScheme()); Assert.assertEquals(digest, acl.getId().getId()); Assert.assertEquals("test", new String(noAuthClient.getData(path).get().getData())); // When tries to write using the no-auth zk client, it should fail. try { noAuthClient.setData(path, "test2".getBytes()).get(); Assert.fail(); } catch (ExecutionException e) { Assert.assertTrue(e.getCause() instanceof KeeperException.NoAuthException); } // Change ACL to make it open for all zkClient.setACL(path, ImmutableList.of(new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.ANYONE_ID_UNSAFE))).get(); // Write again with the non-auth client, now should succeed. noAuthClient.setData(path, "test2".getBytes()).get(); noAuthClient.stopAndWait(); zkClient.stopAndWait(); } finally { zkServer.stopAndWait(); } }