@Before public void setUp() throws Exception { CountDownLatch latch = new CountDownLatch(1); zookeeper = new ZooKeeper("127.0.0.1:2181", 10000, event -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("Zookeeper connected."); } else { throw new RuntimeException("Error connecting to zookeeper"); } latch.countDown(); }); latch.await(); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); curatorFramework.start(); AsyncCuratorFramework asyncCuratorFramework = AsyncCuratorFramework.wrap(curatorFramework); logInfoStorage = new LogInfoStorageImpl(asyncCuratorFramework); }
private void connect(TxZookeeperConfig config) { try { zooKeeper = new ZooKeeper(config.getHost(), config.getSessionTimeOut(), watchedEvent -> { if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) { // 放开闸门, wait在connect方法上的线程将被唤醒 COUNT_DOWN_LATCH.countDown(); } }); COUNT_DOWN_LATCH.await(); Stat stat = zooKeeper.exists(rootPath, false); if (stat == null) { zooKeeper.create(rootPath, rootPath.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { throw new TransactionIoException(e); } }
@Override public void process(WatchedEvent event) throws Exception { String topicPath = zkConf.getZKBasePath() + "/topics/" + topic; LOG.info("get zookeeper notification for path={}", topicPath); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { List<String> newQueues = zkClient.getChildren().forPath(topicPath); List<Integer> newQueueIds = new ArrayList<>(); for (String queue : newQueues) { newQueueIds.add(Integer.valueOf(queue)); } List<Integer> oldQueueIds = metadata.getTopicQueues(topic); Collection<Integer> addedQueueIds = CollectionUtils.subtract(newQueueIds, oldQueueIds); Collection<Integer> deletedQueueIds = CollectionUtils.subtract(oldQueueIds, newQueueIds); for (Integer queueId : addedQueueIds) { String queuePath = topicPath + "/" + queueId; String queueData = new String(zkClient.getData().forPath(queuePath)); Integer shardingId = Integer.valueOf(queueData); metadata.addTopicQueue(topic, queueId, shardingId); } metadata.deleteTopicQueue(topic, deletedQueueIds); } zkClient.getChildren() .usingWatcher(new TopicWatcher(topic)) .forPath(topicPath); }
/** * 获取Zookeeper连接 * @return */ public static ZooKeeper getZooKeeperConnection() { try { // Zookeeper连接闭锁cdl final CountDownLatch cdl = new CountDownLatch(1); final ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { // 当连接成功时放开cdl cdl.countDown(); } } }); // cdl阻塞 cdl.await(); return zooKeeper; } catch (Exception e) { e.printStackTrace(); return null; } }
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (rc == KeeperException.Code.NONODE.intValue()) { // we can just ignore because the child watcher takes care of this return; } if (rc != KeeperException.Code.OK.intValue()) { zk.getData(myNode, (Watcher)ctx, this, ctx); } int currVer = stat.getVersion(); if (currVer != lastVer) { String parts[] = new String(data).split(" ", 2); myInstance.configure(parts[1]); lastVer = currVer; } }
private void updateZnodeData(Stat stat, Watcher watcher) { if (isDestroyed()) { return; } Znode znode = getData(); ZooKeeperConnection zooKeeperConnection = getZooKeeperConnection(); String path = znode.getPath(); byte[] data; try { if (watcher != null) { data = zooKeeperConnection.getData(path, watcher, stat); } else { data = zooKeeperConnection.getData(path, false, stat); } znode.setData(data); znode.setDataReadable(true); } catch (Exception e) { znode.setDataReadable(false); } }
public synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); }
@Override public synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) throws Exception { if (forExpire) { // a hack... couldn't find a way to trigger expired event. WatchedEvent expriredEvent = new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null); super.processWatchEvent(zk, expriredEvent); forExpire = false; syncBarrier.await(); } else { super.processWatchEvent(zk, event); } }
public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { if (stat != null) { n.copyStat(stat); } ArrayList<String> children; Set<String> childs = n.getChildren(); if (childs != null) { children = new ArrayList<String>(childs.size()); children.addAll(childs); } else { children = new ArrayList<String>(0); } if (watcher != null) { childWatches.addWatch(path, watcher); } return children; } }
@Test(timeout = 60000) public void testRootWatchTriggered() throws Exception { class MyWatcher implements Watcher{ boolean fired=false; public void process(WatchedEvent event) { if(event.getPath().equals("/")) fired=true; } } MyWatcher watcher=new MyWatcher(); // set a watch on the root node dt.getChildren("/", new Stat(), watcher); // add a new node, should trigger a watch dt.createNode("/xyz", new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1); Assert.assertFalse("Root node watch not triggered",!watcher.fired); }
private static ZooKeeper connectZooKeeper(String ensemble) throws IOException, KeeperException, InterruptedException { final CountDownLatch latch = new CountDownLatch(1); ZooKeeper zkc = new ZooKeeper(HOSTPORT, 3600, new Watcher() { public void process(WatchedEvent event) { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { latch.countDown(); } } }); if (!latch.await(10, TimeUnit.SECONDS)) { throw new IOException("Zookeeper took too long to connect"); } return zkc; }
/** * Gets the status of a node in ZooKeeper * * @param p_path * the node path * @param p_watcher * the watcher * @return true if the node exists, fals eotherwise * @throws ZooKeeperException * if ZooKeeper could not accessed */ public Stat getStatus(final String p_path, final Watcher p_watcher) throws ZooKeeperException { Stat ret; assert p_path != null; try { if (m_zookeeper == null) { connect(); } if (!p_path.isEmpty()) { ret = m_zookeeper.exists(m_path + '/' + p_path, p_watcher); } else { ret = m_zookeeper.exists(m_path, p_watcher); } } catch (final KeeperException | InterruptedException e) { throw new ZooKeeperException("Could not access ZooKeeper", e); } return ret; }
private void watchNode(final ZooKeeper zk) { try { List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() { public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk); } } }); List<String> dataList = new ArrayList<>(); for (String node : nodeList) { byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); dataList.add(new String(bytes)); } LOGGER.debug("node data: {}", dataList); this.dataList = dataList; LOGGER.debug("Service discovery triggered updating connected server node."); updateConnectedServer(); } catch (Exception e) { LOGGER.error("", e); } }
public ZookeeperRegistry(URL url, ZkClient zkClient) { super(url); this.zkClient = zkClient; IZkStateListener zkStateListener = new IZkStateListener() { @Override public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { // do nothing } @Override public void handleNewSession() throws Exception { logger.info("zkRegistry get new session notify."); } @Override public void handleSessionEstablishmentError(Throwable throwable) throws Exception { } }; this.zkClient.subscribeStateChanges(zkStateListener); }
@Override public void run() { try { Watcher updateWatcher = new DataUpdateWatcher(client, PathVarConst.QUOTECONF_PATH, null); while (true) { // 注册数据更新事件 if (null != this.client.getZooKeeper().exists(PathVarConst.QUOTECONF_PATH, null)) { byte[] data = this.client.getZooKeeper().getData(PathVarConst.QUOTECONF_PATH, updateWatcher, null); System.out.println("接收到的数据为: " + new String(data)); Thread.sleep(10000); } } } catch (Exception e) { e.printStackTrace(); } }
synchronized void addWatch(String path, Watcher watcher) { Set<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); Set<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); }
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception { zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString .toString()), Integer.parseInt(this.properties .getProperty(keys.zkSessionTimeout.toString())), new Watcher() { public void process(WatchedEvent event) { sessionEvent(connectionLatch, event); } }); String authString = this.properties.getProperty(keys.userName.toString()) + ":"+ this.properties.getProperty(keys.password.toString()); this.isCheckParentPath = Boolean.parseBoolean(this.properties.getProperty(keys.isCheckParentPath.toString(),"true")); zk.addAuthInfo("digest", authString.getBytes()); acl.clear(); acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString)))); acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE)); }
public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { if (stat != null) { n.copyStat(stat); } List<String> children=new ArrayList<String>(n.getChildren()); if (watcher != null) { childWatches.addWatch(path, watcher); } return children; } }
public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { if (stat != null) { n.copyStat(stat); } List<String> children = new ArrayList<String>(n.getChildren()); if (watcher != null) { childWatches.addWatch(path, watcher); } return children; } }
public boolean removeWatch(String path, WatcherType type, Watcher watcher) { boolean removed = false; switch (type) { case Children: removed = this.childWatches.removeWatcher(path, watcher); break; case Data: removed = this.dataWatches.removeWatcher(path, watcher); break; case Any: if (this.childWatches.removeWatcher(path, watcher)) { removed = true; } if (this.dataWatches.removeWatcher(path, watcher)) { removed = true; } break; } return removed; }
@Override public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { int count = 0; do { try { return super.getData(path, watcher, stat); } catch (KeeperException.ConnectionLossException e) { LoggerFactory.getLogger().warn( "ZooKeeper connection lost. Trying to reconnect."); } } while (!closed && (limit == -1 || count++ < limit)); return null; }
@Override public void process(WatchedEvent event) throws Exception { String path = zkConf.getZKBasePath() + "/consumers/" + consumerGroup + "/ids"; LOG.info("get zookeeper notification for path={}", path); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { updateConsumerIds(consumerGroup); } zkClient.getChildren() .usingWatcher(new ConsumerWatcher(consumerGroup)) .forPath(path); }
@Override public void process(WatchedEvent event) throws Exception { String shardingPath = zkConf.getZKBasePath() + "/brokers/" + shardingId; LOG.info("get zookeeper notification for path={}", shardingPath); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { List<String> newBrokerAddressList = zkClient.getChildren().forPath(shardingPath); List<String> oldBrokerAddressList = metadata.getBrokerAddressList(shardingId); Collection<String> addedBrokerAddressList = CollectionUtils.subtract(newBrokerAddressList, oldBrokerAddressList); Collection<String> deletedBrokerAddressList = CollectionUtils.subtract(oldBrokerAddressList, newBrokerAddressList); if (addedBrokerAddressList.size() > 0) { if (metadata.getBrokerMap().get(shardingId) != null) { metadata.getBrokerMap().get(shardingId).addEndPoint(addedBrokerAddressList); } else { BrokerClient brokerClient = new BrokerClient(newBrokerAddressList, rpcClientOptions); BrokerClient old = metadata.getBrokerMap().putIfAbsent(shardingId, brokerClient); if (old != null) { old.getRpcClient().stop(); metadata.getBrokerMap().get(shardingId).addEndPoint(old.getAddressList()); } } } if (deletedBrokerAddressList.size() > 0) { metadata.getBrokerMap().get(shardingId).removeEndPoint(deletedBrokerAddressList); } } zkClient.getChildren() .usingWatcher(new BrokerShardingWather(shardingId)) .forPath(shardingPath); }
public synchronized void connect(final long maxMsToWaitUntilConnected, Watcher watcher) { if (_eventThread != null) { return; } boolean started = false; try { getEventLock().lockInterruptibly(); setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start();//这样的 线程很可能会直接退回 _connection.connect(watcher); logger.debug("Awaiting connection to Zookeeper server: " + maxMsToWaitUntilConnected); if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) { throw new ZkTimeoutException(String.format("Unable to connect to zookeeper server[%s] within timeout %dms", _connection.getServers(), maxMsToWaitUntilConnected)); } started = true; } catch (InterruptedException e) { States state = _connection.getZookeeperState(); throw new IllegalStateException("Not connected with zookeeper server yet. Current state is " + state); } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNodeV1 n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); if (watcher != null) { dataWatches.addWatch(path, watcher); } return n.data; } }
/** * 连接zookeeper * * @author gaoxianglong */ public void init() { try { zkClient = new ZooKeeper(zkAddress, zkSessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { KeeperState state = event.getState(); switch (state) { case SyncConnected: countDownLatch.countDown(); logger.info("connection zookeeper success"); break; case Disconnected: logger.warn("zookeeper connection is disconnected"); break; case Expired: logger.error("zookeeper session expired"); break; case AuthFailed: logger.error("authentication failure"); break; default: break; } } }); countDownLatch.await(); } catch (Exception e) { logger.error("error", e); } }
private Stat updateZnodeStat(Watcher watcher) { if (isDestroyed()) { return null; } Znode znode = getData(); String path = znode.getPath(); ZooKeeperConnection zooKeeperConnection = getZooKeeperConnection(); Stat stat = null; try { if (watcher != null) { stat = zooKeeperConnection.exists(path, watcher); } else { stat = zooKeeperConnection.exists(path, false); } znode.setStat(stat); } catch (Exception e) { } if (stat == null) { destroy(); } return stat; }
public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException { Stat stat = new Stat(); DataNodeV1 n = nodes.get(path); if (watcher != null) { dataWatches.addWatch(path, watcher); } if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); return stat; } }
public ArrayList<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNodeV1 n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { ArrayList<String> children = new ArrayList<String>(); children.addAll(n.children); if (watcher != null) { childWatches.addWatch(path, watcher); } return children; } }
public synchronized int size(){ int result = 0; for(Set<Watcher> watches : watchTable.values()) { result += watches.size(); } return result; }
public synchronized void removeWatcher(Watcher watcher) { HashSet<String> paths = watch2Paths.remove(watcher); if (paths == null) { return; } for (String p : paths) { HashSet<Watcher> list = watchTable.get(p); if (list != null) { list.remove(watcher); if (list.size() == 0) { watchTable.remove(p); } } } }
protected ZooKeeper createClient(Watcher watcher, CountDownLatch latch) throws IOException, InterruptedException { ZooKeeper zk = new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher); if(!latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){ Assert.fail("Unable to connect to server"); } return zk; }
@Override public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { int count = 0; do { try { return super.exists(path, watcher); } catch (KeeperException.ConnectionLossException e) { LoggerFactory.getLogger().warn( "ZooKeeper connection lost. Trying to reconnect."); } } while (!closed && (limit == -1 || count++ < limit)); return null; }