/** * get data set by the active leader * * @return data set by the active instance * @throws ActiveNotFoundException * when there is no active leader * @throws KeeperException * other zookeeper operation errors * @throws InterruptedException * @throws IOException * when ZooKeeper connection could not be established */ public synchronized byte[] getActiveData() throws ActiveNotFoundException, KeeperException, InterruptedException, IOException { try { if (zkClient == null) { createConnection(); } Stat stat = new Stat(); return getDataWithRetries(zkLockFilePath, false, stat); } catch(KeeperException e) { Code code = e.code(); if (isNodeDoesNotExist(code)) { // handle the commonly expected cases that make sense for us throw new ActiveNotFoundException(); } else { throw e; } } }
/** * Waits for the next event from ZooKeeper to arrive. * * @param connectionTimeoutMs zookeeper connection timeout in milliseconds * @throws KeeperException if the connection attempt times out. This will * be a ZooKeeper ConnectionLoss exception code. * @throws IOException if interrupted while connecting to ZooKeeper */ private void waitForZKConnectionEvent(int connectionTimeoutMs) throws KeeperException, IOException { try { if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) { LOG.error("Connection timed out: couldn't connect to ZooKeeper in " + connectionTimeoutMs + " milliseconds"); zk.close(); throw KeeperException.create(Code.CONNECTIONLOSS); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException( "Interrupted when connecting to zookeeper server", e); } }
@Test public void testQuitElectionRemovesBreadcrumbNode() throws Exception { mockNoPriorActive(); elector.joinElection(data); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // Writes its own active info Mockito.verify(mockZK, Mockito.times(1)).create( Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(data), Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT)); mockPriorActive(data); elector.quitElection(false); // Deletes its own active data Mockito.verify(mockZK, Mockito.times(1)).delete( Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(0)); }
/** * verify that more than 3 network error retries result fatalError */ @Test public void testStatNodeRetry() { elector.joinElection(data); elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); Mockito .verify(mockApp, Mockito.times(1)) .notifyFatalError( "Received stat error from Zookeeper. code:CONNECTIONLOSS. "+ "Not retrying further znode monitoring connection errors."); }
/** * We create a perfectly valid 'exists' request, except that the opcode is wrong. * @return * @throws Exception */ @Test public void testNonExistingOpCode() throws Exception { TestableZooKeeper zk = createClient(); final String path = "/m1"; RequestHeader h = new RequestHeader(); h.setType(888); // This code does not exists ExistsRequest request = new ExistsRequest(); request.setPath(path); request.setWatch(false); ExistsResponse response = new ExistsResponse(); ReplyHeader r = zk.submitRequest(h, request, response, null); Assert.assertEquals(r.getErr(), Code.UNIMPLEMENTED.intValue()); try { zk.exists("/m1", false); fail("The connection should have been closed"); } catch (KeeperException.ConnectionLossException expected) { } }
private void createRootDir(final String rootPath) throws Exception { // For root dirs, we shouldn't use the doMulti helper methods new ZKAction<String>() { @Override public String run() throws KeeperException, InterruptedException { try { return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT); } catch (KeeperException ke) { if (ke.code() == Code.NODEEXISTS) { LOG.debug(rootPath + "znode already exists!"); return null; } else { throw ke; } } } }.runWithRetries(); }
/** * Set the ACL for the node of the given path if such a node exists and the * given aclVersion matches the acl version of the node. Return the stat of the * node. * <p> * A KeeperException with error code KeeperException.NoNode will be thrown * if no node with the given path exists. * <p> * A KeeperException with error code KeeperException.BadVersion will be * thrown if the given aclVersion does not match the node's aclVersion. * * @param path the given path for the node * @param acl the given acl for the node * @param aclVersion the given acl version of the node * @return the stat of the node. * @throws InterruptedException If the server transaction is interrupted. * @throws KeeperException If the server signals an error with a non-zero error code. * @throws org.apache.zookeeper.KeeperException.InvalidACLException If the acl is invalide. * @throws IllegalArgumentException if an invalid path is specified */ public Stat setACL(final String path, List<ACL> acl, int aclVersion) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); validateACL(acl); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.setACL); SetACLRequest request = new SetACLRequest(); request.setPath(serverPath); request.setAcl(acl); request.setVersion(aclVersion); SetACLResponse response = new SetACLResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } return response.getStat(); }
private void removeWatches(int opCode, String path, Watcher watcher, WatcherType watcherType, boolean local) throws InterruptedException, KeeperException { PathUtils.validatePath(path); final String clientPath = path; final String serverPath = prependChroot(clientPath); WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, watchManager); RequestHeader h = new RequestHeader(); h.setType(opCode); Record request = getRemoveWatchesRequest(opCode, watcherType, serverPath); ReplyHeader r = cnxn.submitRequest(h, request, null, null, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } }
private void removeWatches(ZooKeeper zk, String path, Watcher watcher, WatcherType watcherType, boolean local, KeeperException.Code rc) throws InterruptedException, KeeperException { LOG.info( "Sending removeWatches req using zk {} path: {} type: {} watcher: {} ", new Object[] { zk, path, watcherType, watcher }); if (useAsync) { MyCallback c1 = new MyCallback(rc.intValue(), path); zk.removeWatches(path, watcher, watcherType, local, c1, null); Assert.assertTrue("Didn't succeeds removeWatch operation", c1.matches()); if (KeeperException.Code.OK.intValue() != c1.rc) { KeeperException ke = KeeperException .create(KeeperException.Code.get(c1.rc)); throw ke; } } else { zk.removeWatches(path, watcher, watcherType, local); } }
/** * Test verifies WatcherType.Any - removes only the configured child watcher * function */ @Test(timeout = 90000) public void testRemoveAnyChildWatcher() throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); MyWatcher w1 = new MyWatcher("/node1", 2); MyWatcher w2 = new MyWatcher("/node1", 1); LOG.info("Adding data watcher {} on path {}", new Object[] { w1, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w1)); // Add multiple child watches LOG.info("Adding child watcher {} on path {}", new Object[] { w1, "/node1" }); zk2.getChildren("/node1", w2); LOG.info("Adding child watcher {} on path {}", new Object[] { w2, "/node1" }); zk2.getChildren("/node1", w1); removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove child watcher", w2.matches()); Assert.assertEquals("Didn't find child watcher", 1, zk2 .getChildWatches().size()); Assert.assertEquals("Didn't find data watcher", 1, zk2 .getDataWatches().size()); removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove watchers", w1.matches()); }
/** * Verify that if a given watcher doesn't exist, the server properly * returns an error code for it. * * In our Java client implementation, we check that a given watch exists at * two points: * * 1) before submitting the RemoveWatches request * 2) after a successful server response, when the watcher needs to be * removed * * Since this can be racy (i.e. a watch can fire while a RemoveWatches * request is in-flight), we need to verify that the watch was actually * removed (i.e. from ZKDatabase and DataTree) and return NOWATCHER if * needed. * * Also, other implementations might not do a client side check before * submitting a RemoveWatches request. If we don't do a server side check, * we would just return ZOK even if no watch was removed. * */ @Test(timeout = 90000) public void testNoWatcherServerException() throws InterruptedException, IOException, TimeoutException { CountdownWatcher watcher = new CountdownWatcher(); MyZooKeeper zk = new MyZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher); boolean nw = false; watcher.waitForConnected(CONNECTION_TIMEOUT); try { zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false); } catch (KeeperException nwe) { if (nwe.code().intValue() == Code.NOWATCHER.intValue()) { nw = true; } } Assert.assertTrue("Server didn't return NOWATCHER", zk.getRemoveWatchesRC() == Code.NOWATCHER.intValue()); Assert.assertTrue("NoWatcherException didn't happen", nw); }
/** Validate that the deprecated constant still works. There were issues * found with switch statements - which need compile time constants. */ @Test @SuppressWarnings("deprecation") public void testDeprecatedCodeOkInSwitch() { int test = 1; switch (test) { case Code.Ok: Assert.assertTrue(true); break; } }
public void verifySetACLFailure_BadVersion() { new StringCB(zk).verifyCreate(); rc = Code.BADVERSION; stat = null; zk.setACL(path, acl, version + 1, this, toString()); verify(); }
/** * verify that successful znode create result becomes active and monitoring is * started */ @Test public void testCreateNodeResultBecomeActive() throws Exception { mockNoPriorActive(); elector.joinElection(data); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); verifyExistCall(1); // monitor callback verifies the leader is ephemeral owner of lock but does // not call becomeActive since its already active Stat stat = new Stat(); stat.setEphemeralOwner(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat); // should not call neutral mode/standby/active Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); // another joinElection not called. Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); // no new monitor called verifyExistCall(1); }
/** * verify becomeStandby is not called if already in standby */ @Test public void testSuccessiveStandbyCalls() { elector.joinElection(data); // make the object go into the monitoring standby state elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME); // notify node deletion // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); elector.processWatchEvent(mockZK, mockEvent); // is standby. no need to notify anything now Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); // another joinElection called. Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); // lost election elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // still standby. so no need to notify again Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); // monitor is set again verifyExistCall(2); }
public void verifyCreateFailure_NodeExists() { new Create2CB(zk).verifyCreate(); rc = Code.NODEEXISTS; name = null; stat = null; zk.create(path, data, acl, flags, this, toString()); verify(); }
/** * verify that znode create for existing node and no retry becomes standby and * monitoring is started */ @Test public void testCreateNodeResultBecomeStandby() { elector.joinElection(data); elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); }
public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) { this.acl = acl; this.stat = stat; super.processResult(Code.get(rc), path, ctx); }
/** * verify error in exists() callback results in fatal error */ @Test public void testStatNodeError() { elector.joinElection(data); elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( "Received stat error from Zookeeper. code:RUNTIMEINCONSISTENCY"); }
/** * verify becomeStandby is not called if already in standby */ @Test public void testSuccessiveStandbyCalls() { elector.joinElection(data); // make the object go into the monitoring standby state elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); Assert.assertTrue(elector.isMonitorLockNodePending()); Stat stat = new Stat(); stat.setEphemeralOwner(0L); Mockito.when(mockZK.getSessionId()).thenReturn(1L); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat); Assert.assertFalse(elector.isMonitorLockNodePending()); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME); // notify node deletion // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); elector.processWatchEvent(mockZK, mockEvent); // is standby. no need to notify anything now Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); // another joinElection called. Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); // lost election elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // still standby. so no need to notify again Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); // monitor is set again verifyExistCall(2); }
@Test public void testDeleteWithChildren() throws Exception { ZooKeeper zk = createClient(); zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); try { zk.delete("/parent", -1); Assert.fail("Should have received a not equals message"); } catch (KeeperException e) { Assert.assertEquals(KeeperException.Code.NOTEMPTY, e.code()); } zk.delete("/parent/child", -1); zk.delete("/parent", -1); zk.close(); }
/** Verify the enum works (paranoid) */ @Test public void testCodeOKInSwitch() { Code test = Code.OK; switch (test) { case OK: Assert.assertTrue(true); break; } }
public void processResult(Code rc, String path, Object ctx) { this.rc = rc; this.path = path; this.expected = (String)ctx; latch.countDown(); }
public void verifyCreateFailure_NoNode() { rc = Code.NONODE; name = null; path = path + "/bar"; zk.create(path, data, acl, flags, this, toString()); verify(); }
public void verifyCreateFailure_NoChildForEphemeral() { new StringCB(zk).verifyCreateEphemeral(); rc = Code.NOCHILDRENFOREPHEMERALS; name = null; path = path + "/bar"; zk.create(path, data, acl, flags, this, toString()); verify(); }
public void verifyDeleteFailure_NotEmpty() { StringCB scb = new StringCB(zk); scb.create(); scb.setPath(path + "/bar"); scb.create(); rc = Code.NOTEMPTY; zk.delete(path, version, this, toString()); verify(); }
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { this.children = (children == null ? new ArrayList<String>() : children); Collections.sort(this.children); super.processResult(Code.get(rc), path, ctx); }
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { this.data = data; this.stat = stat; super.processResult(Code.get(rc), path, ctx); }
public void verifySetDataFailure_BadVersion() { new StringCB(zk).verifyCreate(); rc = Code.BADVERSION; stat = null; zk.setData(path, data, version + 1, this, toString()); verify(); }
public void verifyGetACLFailure_NoNode(){ rc = Code.NONODE; stat = null; acl = null; zk.getACL(path, stat, this, toString()); verify(); }
T runWithRetries() throws Exception { int retry = 0; while (true) { try { return runWithCheck(); } catch (KeeperException.NoAuthException nae) { if (HAUtil.isHAEnabled(getConfig())) { // NoAuthException possibly means that this store is fenced due to // another RM becoming active. Even if not, // it is safer to assume we have been fenced throw new StoreFencedException(); } } catch (KeeperException ke) { if (ke.code() == Code.NODEEXISTS) { LOG.info("znode already exists!"); return null; } if (hasDeleteNodeOp && ke.code() == Code.NONODE) { LOG.info("znode has already been deleted!"); return null; } LOG.info("Exception while executing a ZK operation.", ke); if (shouldRetry(ke.code()) && ++retry < numRetries) { LOG.info("Retrying operation on ZK. Retry no. " + retry); Thread.sleep(zkRetryInterval); createConnection(); continue; } LOG.info("Maxed out ZK retries. Giving up!"); throw ke; } } }
private void finishPacket(Packet p) { int err = p.replyHeader.getErr(); if (p.watchRegistration != null) { p.watchRegistration.register(err); } // Add all the removed watch events to the event queue, so that the // clients will be notified with 'Data/Child WatchRemoved' event type. if (p.watchDeregistration != null) { Map<EventType, Set<Watcher>> materializedWatchers = null; try { materializedWatchers = p.watchDeregistration.unregister(err); for (Entry<EventType, Set<Watcher>> entry : materializedWatchers .entrySet()) { Set<Watcher> watchers = entry.getValue(); if (watchers.size() > 0) { queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey()); // ignore connectionloss when removing from local // session p.replyHeader.setErr(Code.OK.intValue()); } } } catch (KeeperException.NoWatcherException nwe) { p.replyHeader.setErr(nwe.code().intValue()); } catch (KeeperException ke) { p.replyHeader.setErr(ke.code().intValue()); } } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
void queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType) { KeeperState sessionState = KeeperState.SyncConnected; if (KeeperException.Code.SESSIONEXPIRED.intValue() == err || KeeperException.Code.CONNECTIONLOSS.intValue() == err) { sessionState = Event.KeeperState.Disconnected; } WatchedEvent event = new WatchedEvent(eventType, sessionState, clientPath); eventThread.queueEvent(event, materializedWatchers); }