/** * verify becomeStandby is not called if already in standby */ @Test public void testSuccessiveStandbyCalls() { elector.joinElection(data); // make the object go into the monitoring standby state elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); Assert.assertTrue(elector.isMonitorLockNodePending()); Stat stat = new Stat(); stat.setEphemeralOwner(0L); Mockito.when(mockZK.getSessionId()).thenReturn(1L); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat); Assert.assertFalse(elector.isMonitorLockNodePending()); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME); // notify node deletion // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); elector.processWatchEvent(mockZK, mockEvent); // is standby. no need to notify anything now Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); // another joinElection called. Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); // lost election elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // still standby. so no need to notify again Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); // monitor is set again verifyExistCall(2); }
/** * joinElection(..) should happen only after SERVICE_HEALTHY. */ @Test public void testBecomeActiveBeforeServiceHealthy() throws Exception { mockNoPriorActive(); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.None); // session expired should enter safe mode // But for first time, before the SERVICE_HEALTY i.e. appData is set, // should not enter the election. Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired); elector.processWatchEvent(mockZK, mockEvent); // joinElection should not be called. Mockito.verify(mockZK, Mockito.times(0)).create(ZK_LOCK_NAME, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); }
private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { addr = hostProvider.next(1000); } setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); if (ZooKeeperSaslClient.isEnabled()) { try { String principalUserName = System.getProperty( ZK_SASL_CLIENT_USERNAME, "zookeeper"); zooKeeperSaslClient = new ZooKeeperSaslClient( principalUserName+"/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr); clientCnxnSocket.connect(addr); }
/** * Callback invoked by the ClientCnxnSocket once a connection has been * established. * * @param _negotiatedSessionTimeout * @param _sessionId * @param _sessionPasswd * @param isRO * @throws IOException */ void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; if (negotiatedSessionTimeout <= 0) { state = States.CLOSED; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); eventThread.queueEventOfDeath(); String warnInfo; warnInfo = "Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(sessionId) + " has expired"; LOG.warn(warnInfo); throw new SessionExpiredException(warnInfo); } if (!readOnly && isRO) { LOG.error("Read/write client got connected to read-only server"); } readTimeout = negotiatedSessionTimeout * 2 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; seenRwServerBefore |= !isRO; LOG.info("Session establishment complete on server " + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x" + Long.toHexString(sessionId) + ", negotiated timeout = " + negotiatedSessionTimeout + (isRO ? " (READ-ONLY mode)" : "")); KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, eventState, null)); }
public void process(WatchedEvent event) { super.process(event); if (event.getType() != Event.EventType.None) { try { events.put(event); } catch (InterruptedException e) { LOG.warn("ignoring interrupt during event.put"); } } }
void queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType) { KeeperState sessionState = KeeperState.SyncConnected; if (KeeperException.Code.SESSIONEXPIRED.intValue() == err || KeeperException.Code.CONNECTIONLOSS.intValue() == err) { sessionState = Event.KeeperState.Disconnected; } WatchedEvent event = new WatchedEvent(eventType, sessionState, clientPath); eventThread.queueEvent(event, materializedWatchers); }
public void process(WatchedEvent event) { super.process(event); if (event.getType() != Event.EventType.None) { timeOfLastWatcherInvocation = System.currentTimeMillis(); try { events.put(event); } catch (InterruptedException e) { LOG.warn("ignoring interrupt during event.put"); } } }
/** * verify becomeStandby is not called if already in standby */ @Test public void testSuccessiveStandbyCalls() { elector.joinElection(data); // make the object go into the monitoring standby state elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME); // notify node deletion // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); elector.processWatchEvent(mockZK, mockEvent); // is standby. no need to notify anything now Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); // another joinElection called. Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); // lost election elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // still standby. so no need to notify again Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); // monitor is set again verifyExistCall(2); }
private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { addr = hostProvider.next(1000); } setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); try { zooKeeperSaslClient = new ZooKeeperSaslClient("zookeeper/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } logStartConnect(addr); clientCnxnSocket.connect(addr); }
/** * Callback invoked by the ClientCnxnSocket once a connection has been * established. * * @param _negotiatedSessionTimeout * @param _sessionId * @param _sessionPasswd * @param isRO * @throws IOException */ void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; if (negotiatedSessionTimeout <= 0) { state = States.CLOSED; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); eventThread.queueEventOfDeath(); throw new SessionExpiredException( "Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(sessionId) + " has expired"); } if (!readOnly && isRO) { LOG.error("Read/write client got connected to read-only server"); } readTimeout = negotiatedSessionTimeout * 2 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; seenRwServerBefore |= !isRO; LOG.info("Session establishment complete on server " + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x" + Long.toHexString(sessionId) + ", negotiated timeout = " + negotiatedSessionTimeout + (isRO ? " (READ-ONLY mode)" : "")); KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, eventState, null)); }
private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { addr = hostProvider.next(1000); } setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); try { zooKeeperSaslClient = new ZooKeeperSaslClient("zookeeper/" + addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } logStartConnect(addr); clientCnxnSocket.connect(addr); }