private void processDataOrChildChange(WatchedEvent event) { final String path = event.getPath(); if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) { Set<IZkChildListener> childListeners = _childListener.get(path); if (childListeners != null && !childListeners.isEmpty()) { fireChildChangedEvents(path, childListeners); } } if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) { Set<IZkDataListener> listeners = _dataListener.get(path); if (listeners != null && !listeners.isEmpty()) { fireDataChangedEvents(event.getPath(), listeners); } } }
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
@Override public void process(WatchedEvent event) { ZooKeeper zkClient = zookeeperConnManager.getZkClient(); try { /* 重新注册节点 */ zkClient.exists(nodePath, this); EventType eventType = event.getType(); switch (eventType) { case NodeDeleted: election(); break; default: break; } } catch (Exception e) { log.error("error", e); } }
@Override public void process(WatchedEvent event) { ZooKeeper zkClient = zookeeperConnManager.getZkClient(); try { /* 重新注册节点 */ List<String> childrens = zkClient.getChildren(nodePath, this); EventType eventType = event.getType(); switch (eventType) { case NodeChildrenChanged: log.info("当前注册中心内的成功注册的agent数量-->" + childrens.stream().filter(children -> children.startsWith("agent")).count()); break; default: break; } } catch (Exception e) { log.error("error", e); } }
@Test public void testNodeCreated() throws Exception { QuorumUtil qu = new QuorumUtil(1); qu.startAll(); EventsWatcher watcher = new EventsWatcher(); ZooKeeper zk1 = createClient(qu, 1, watcher); ZooKeeper zk2 = createClient(qu, 2); String path = "/test1-created"; zk1.exists(path, watcher); qu.shutdown(1); zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); qu.start(1); watcher.waitForConnected(TIMEOUT * 1000L); watcher.assertEvent(TIMEOUT, EventType.NodeCreated); qu.shutdownAll(); }
public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { if (latch != null) { latch.countDown(); } } if (event.getType() == EventType.None) { return; } try { events.put(event); } catch (InterruptedException e) { Assert.assertTrue("interruption unexpected", false); } }
@Test public void testNodeChildrenChanged() throws Exception { QuorumUtil qu = new QuorumUtil(1); qu.startAll(); EventsWatcher watcher = new EventsWatcher(); ZooKeeper zk1 = createClient(qu, 1, watcher); ZooKeeper zk2 = createClient(qu, 2); String path = "/test-children-changed"; zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk1.getChildren(path, watcher); qu.shutdown(1); zk2.create(path + "/children-1", new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); qu.start(1); watcher.waitForConnected(TIMEOUT * 1000L); watcher.assertEvent(TIMEOUT, EventType.NodeChildrenChanged); qu.shutdownAll(); }
public void process(WatchedEvent event) { if (!closed) { try { if (event.getType() != EventType.NodeDeleted) { Stat s = zooKeeper.exists(nodePath, this); if (s != null) { zookeeper.getChildren(nodePath, this); } } } catch (Exception e) { LoggerFactory.getLogger().error( "Error occured re-adding node watcherfor node " + nodePath, e); } nodeListener.processEvent(event.getPath(), event.getType() .name(), null); } }
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); final Set<Watcher> watchers; if (materializedWatchers == null) { // materialize the watchers based on the event watchers = watcher.materialize(event.getState(), event.getType(), event.getPath()); } else { watchers = new HashSet<Watcher>(); watchers.addAll(materializedWatchers); } WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
public void process(WatchedEvent event) { if (!closed) { try { if (event.getType() != EventType.NodeDeleted) { Stat s = zooKeeper.exists(nodePath, this); if (s != null) { zookeeper.getChildren(nodePath, this); } } } catch (Exception e) { LoggerFactory.getLogger().error( "Error occurred re-adding node watcherfor node " + nodePath, e); } nodeListener.processEvent(event.getPath(), event.getType() .name(), null); } }
/** * Test verifies deletion of NodeChildrenChanged watches */ @Test(timeout = 30000) public void testRemoveNodeChildrenChangedWatches() throws Exception { List<EventType> expectedEvents = new ArrayList<Watcher.Event.EventType>(); expectedEvents.add(EventType.ChildWatchRemoved); MyWatcher myWatcher = new MyWatcher("/testnode1", expectedEvents, 1); zk.create("/testnode1", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); LOG.info("Adding child changed watcher"); zk.getChildren("/testnode1", myWatcher); String cmdstring = "removewatches /testnode1 -c"; LOG.info("Remove watchers using shell command : {}", cmdstring); zkMain.cl.parseCommand(cmdstring); Assert.assertTrue("Removewatches cmd fails to remove child watches", zkMain.processZKCmd(zkMain.cl)); myWatcher.matches(); Assert.assertEquals( "Failed to remove child watches : " + zk.getChildWatches(), 0, zk.getChildWatches().size()); }
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNodeV1 n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
@Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeDataChanged) { synchronized(this) { changed = true; notifyAll(); } } }
@Test public void testDefaultWatcherAutoResetWithChroot() throws Exception { ZooKeeper zk1 = createClient(); zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); MyWatcher watcher = new MyWatcher(); ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1"); zk2.getChildren("/", true ); // this call shouldn't trigger any error or watch zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // this should trigger the watch zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertNotNull(e); Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); Assert.assertEquals("/", e.getPath()); zk2.getChildren("/", true ); stopServer(); watcher.waitForDisconnected(3000); startServer(); watcher.waitForConnected(3000); // this should trigger the watch zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertNotNull(e); Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); Assert.assertEquals("/", e.getPath()); }
public void process(WatchedEvent event) { super.process(event); if (event.getType() != Event.EventType.None) { timeOfLastWatcherInvocation = System.currentTimeMillis(); try { events.put(event); } catch (InterruptedException e) { LOG.warn("ignoring interrupt during event.put"); } } }
public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
@Test public void testIntConversion() { // Ensure that we can convert all valid integers to EventTypes EnumSet<EventType> allTypes = EnumSet.allOf(EventType.class); for(EventType et : allTypes) { Assert.assertEquals(et, EventType.fromInt( et.getIntValue() ) ); } }
@Test public void testNodeDataChanged() throws Exception { QuorumUtil qu = new QuorumUtil(1); qu.startAll(); EventsWatcher watcher = new EventsWatcher(); ZooKeeper zk1 = createClient(qu, 1, watcher); ZooKeeper zk2 = createClient(qu, 2); String path = "/test-changed"; zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk1.getData(path, watcher, null); qu.shutdown(1); zk2.delete(path, -1); zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); qu.start(1); watcher.waitForConnected(TIMEOUT); watcher.assertEvent(TIMEOUT, EventType.NodeDataChanged); zk1.exists(path, watcher); qu.shutdown(1); zk2.delete(path, -1); zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); qu.start(1); watcher.waitForConnected(TIMEOUT * 1000L); watcher.assertEvent(TIMEOUT, EventType.NodeDataChanged); qu.shutdownAll(); }
private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { addr = hostProvider.next(1000); } setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); if (ZooKeeperSaslClient.isEnabled()) { try { String principalUserName = System.getProperty( ZK_SASL_CLIENT_USERNAME, "zookeeper"); zooKeeperSaslClient = new ZooKeeperSaslClient( principalUserName+"/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr); clientCnxnSocket.connect(addr); }
/** * Callback invoked by the ClientCnxnSocket once a connection has been * established. * * @param _negotiatedSessionTimeout * @param _sessionId * @param _sessionPasswd * @param isRO * @throws IOException */ void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; if (negotiatedSessionTimeout <= 0) { state = States.CLOSED; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); eventThread.queueEventOfDeath(); String warnInfo; warnInfo = "Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(sessionId) + " has expired"; LOG.warn(warnInfo); throw new SessionExpiredException(warnInfo); } if (!readOnly && isRO) { LOG.error("Read/write client got connected to read-only server"); } readTimeout = negotiatedSessionTimeout * 2 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; seenRwServerBefore |= !isRO; LOG.info("Session establishment complete on server " + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x" + Long.toHexString(sessionId) + ", negotiated timeout = " + negotiatedSessionTimeout + (isRO ? " (READ-ONLY mode)" : "")); KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, eventState, null)); }
public void process(WatchedEvent event) { System.err.println(event); synchronized (this) { if (event.getType() == EventType.None) { connected = (event.getState() == KeeperState.SyncConnected); notifyAll(); } } }
public void process(WatchedEvent event) { if (event.getType() == Watcher.Event.EventType.None) { synchronized (this) { connected = event.getState() == Watcher.Event.KeeperState.SyncConnected; notifyAll(); } } }
public void process(WatchedEvent event) { super.process(event); if (event.getType() != EventType.None) { try { events.put(event); } catch (InterruptedException e) { LOG.warn("ignoring interrupt during event.put"); } } }
@Test public void testConvertingToEventWrapper() { WatchedEvent we = new WatchedEvent(EventType.NodeCreated, KeeperState.Expired, "blah"); WatcherEvent wew = we.getWrapper(); Assert.assertEquals(EventType.NodeCreated.getIntValue(), wew.getType()); Assert.assertEquals(KeeperState.Expired.getIntValue(), wew.getState()); Assert.assertEquals("blah", wew.getPath()); }
public void process(WatchedEvent event) { super.process(event); if (event.getType() != Event.EventType.None) { try { events.put(event); } catch (InterruptedException e) { LOG.warn("ignoring interrupt during event.put"); } } }
@Test public void testChildWatcherAutoResetWithChroot() throws Exception { ZooKeeper zk1 = createClient(); zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); MyWatcher watcher = new MyWatcher(); ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1"); zk2.getChildren("/", true ); // this call shouldn't trigger any error or watch zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // this should trigger the watch zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertNotNull(e); Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); Assert.assertEquals("/", e.getPath()); MyWatcher childWatcher = new MyWatcher(); zk2.getChildren("/", childWatcher); stopServer(); watcher.waitForDisconnected(3000); startServer(); watcher.waitForConnected(3000); // this should trigger the watch zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertNotNull(e); Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); Assert.assertEquals("/", e.getPath()); }
@Override public void setUp() throws Exception { super.setUp(); client_latch = new CountDownLatch(1); client_dwatch = new SimpleWatcher(client_latch); client = createClient(client_dwatch, client_latch); lsnr_latch = new CountDownLatch(1); lsnr_dwatch = new SimpleWatcher(lsnr_latch); lsnr = createClient(lsnr_dwatch, lsnr_latch); expected = new ArrayList<EventType>(); }
@Override public void process(WatchedEvent event) { super.process(event); try { if (event.getType() != Event.EventType.None) { dataEvents.put(event); } } catch (InterruptedException e) { LOG.warn("ignoring interrupt during EventsWatcher process"); } }
public void assertEvent(long timeout, EventType eventType) { try { WatchedEvent event = dataEvents.poll(timeout, TimeUnit.MILLISECONDS); Assert.assertNotNull("do not receive a " + eventType, event); Assert.assertEquals(eventType, event.getType()); } catch (InterruptedException e) { LOG.warn("ignoring interrupt during EventsWatcher assertEvent"); } }