/** * Parses a comma-separated string of enpoints and returns a list of addresses * * @param connectionString a coma-separated list of servers * @return a list of server addresses * @throws IllegalArgumentException cannot parse hosts */ public static List<InetSocketAddress> toList(final String connectionString) { final List<InetSocketAddress> hostAddresses = new ArrayList<>(); try { final ConnectStringParser parser = new ConnectStringParser(connectionString); parser.getServerAddresses().forEach(serverAddress -> { hostAddresses.add(new InetSocketAddress(serverAddress.getHostName(), serverAddress.getPort())); }); return hostAddresses; } catch (Exception e) { throw new IllegalArgumentException("Cannot parse hosts",e); } }
private ZooMap(Builder builder) { this.connectionString = builder.connectionString; ConnectStringParser connectStringParser = new ConnectStringParser(connectionString); if(connectStringParser.getChrootPath() != null) { final String connectionStringForChrootCreation = connectStringParser.getServerAddresses().stream().map(InetSocketAddress::toString).collect(Collectors.joining(",")); try(final CuratorFramework clientForChrootCreation = newCuratorFrameworkClient(builder, connectionStringForChrootCreation)) { startAndBlock(clientForChrootCreation); tryIt(() -> clientForChrootCreation.createContainers(connectStringParser.getChrootPath())); } } client = newCuratorFrameworkClient(builder, connectionString); this.root = builder.root; startAndBlock(client); if(!root.isEmpty()) { tryIt(() -> client.createContainers(root)); } }
/** * if ZkConnectString contains namespace path at the end, but it does not exist we should fail * @param zkConnect - connect string * @param zkClient - zkClient object to talk to the ZK */ public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) { ConnectStringParser parser = new ConnectStringParser(zkConnect); String path = parser.getChrootPath(); if (Strings.isNullOrEmpty(path)) { return; // no namespace path } LOG.info("connectString = " + zkConnect + "; path =" + path); // if namespace specified (path above) but "/" does not exists, we will fail if (!zkClient.exists("/")) { throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect); } }
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, StdOutSink sink, ExecutorService executor, boolean treatFailureAsError) { super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError); Configuration configuration = connection.getConfiguration(); znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); timeout = configuration .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); ConnectStringParser parser = new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); hosts = Lists.newArrayList(); for (InetSocketAddress server : parser.getServerAddresses()) { hosts.add(server.toString()); } }
@Test public void should_get_a_list_of_brokers_watcher_from_comma_separated_zk_string() { String zkHosts = PropertyNames.ZK_SERVERS.getDefaultValue(); ConnectStringParser parser = new ConnectStringParser(PropertyNames.ZK_SERVERS.getDefaultValue()); int brokerPollingDelay = Integer.parseInt(PropertyNames.ZK_NODE_POLL_DELAY_TIME_MS.getDefaultValue()) ; int brokerPollingInitialDelay = Integer.parseInt(PropertyNames.ZK_NODE_POLL_INITIAL_DELAY_TIME_MS.getDefaultValue()); ZKMonitorCallback zkMonitorListener = new TestCallback(); final List<ZKNodeWatcher> brokers = ZKNodeWatchersBuilder.build(parser.getServerAddresses(), zkMonitorListener, brokerPollingDelay, brokerPollingInitialDelay); Assert.assertTrue(brokers.size()==3); StringBuilder builder = new StringBuilder(); brokers.forEach(broker -> { builder.append(broker.getZKNodeAddress().getHostName() +":"+ broker.getZKNodeAddress() .getPort()).append(","); }); String actualZKHosts = builder.toString().substring(0,builder.length()-1); Assert.assertTrue(actualZKHosts.equals(zkHosts)); }
@Test public void testSingleServerChrootPath(){ String chrootPath = "/hallo/welt"; String servers = "10.10.10.1"; assertChrootPath(chrootPath, new ConnectStringParser(servers+chrootPath)); }
@Test public void testMultipleServersChrootPath(){ String chrootPath = "/hallo/welt"; String servers = "10.10.10.1,10.10.10.2"; assertChrootPath(chrootPath, new ConnectStringParser(servers+chrootPath)); }
@Test public void testParseServersWithoutPort(){ String servers = "10.10.10.1,10.10.10.2"; ConnectStringParser parser = new ConnectStringParser(servers); Assert.assertEquals("10.10.10.1", parser.getServerAddresses().get(0).getHostName()); Assert.assertEquals("10.10.10.2", parser.getServerAddresses().get(1).getHostName()); }
@Test public void testParseServersWithPort(){ String servers = "10.10.10.1:112,10.10.10.2:110"; ConnectStringParser parser = new ConnectStringParser(servers); Assert.assertEquals("10.10.10.1", parser.getServerAddresses().get(0).getHostName()); Assert.assertEquals("10.10.10.2", parser.getServerAddresses().get(1).getHostName()); Assert.assertEquals(112, parser.getServerAddresses().get(0).getPort()); Assert.assertEquals(110, parser.getServerAddresses().get(1).getPort()); }
public void configMutliCluster(ZooKeeper zk) { if (_servers.size() == 1) { return; } String cluster1 = _servers.get(0); try { if (_servers.size() > 1) { // 强制的声明accessible ReflectionUtils.makeAccessible(clientCnxnField); ReflectionUtils.makeAccessible(hostProviderField); ReflectionUtils.makeAccessible(serverAddressesField); // 添加第二组集群列表 for (int i = 1; i < _servers.size(); i++) { String cluster = _servers.get(i); // 强制获取zk中的地址信息 ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk); HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn); List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField, hostProvider); // 添加第二组集群列表 serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses()); } } } catch (Exception e) { try { if (zk != null) { zk.close(); } } catch (InterruptedException ie) { // ignore interrupt } throw new ZkException("zookeeper_create_error, serveraddrs=" + cluster1, e); } }
@Test public void testParseServersWithoutPort(){ String servers = "10.10.10.1,10.10.10.2"; ConnectStringParser parser = new ConnectStringParser(servers); Assert.assertEquals("10.10.10.1", parser.getServerAddresses().get(0).getHostString()); Assert.assertEquals("10.10.10.2", parser.getServerAddresses().get(1).getHostString()); }
@Test public void testParseServersWithPort(){ String servers = "10.10.10.1:112,10.10.10.2:110"; ConnectStringParser parser = new ConnectStringParser(servers); Assert.assertEquals("10.10.10.1", parser.getServerAddresses().get(0).getHostString()); Assert.assertEquals("10.10.10.2", parser.getServerAddresses().get(1).getHostString()); Assert.assertEquals(112, parser.getServerAddresses().get(0).getPort()); Assert.assertEquals(110, parser.getServerAddresses().get(1).getPort()); }