@Test public void TestGetResults() throws Exception { /* Delete of a node folowed by an update of the (now) deleted node */ try { zk.multi(Arrays.asList( Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.delete("/multi", 0), Op.setData("/multi", "Y".getBytes(), 0), Op.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) )); Assert.fail("/multi should have been deleted so setData should have failed"); } catch (KeeperException e) { // '/multi' should never have been created as entire op should fail Assert.assertNull(zk.exists("/multi", null)); for (OpResult r : e.getResults()) { LOG.info("RESULT==> " + r); if (r instanceof ErrorResult) { ErrorResult er = (ErrorResult) r; LOG.info("ERROR RESULT: " + er + " ERR=>" + KeeperException.Code.get(er.getErr())); } } } }
@Override protected void abortOpResult(Throwable t, @Nullable OpResult opResult) { Throwable cause; if (null == opResult) { cause = t; } else { assert (opResult instanceof OpResult.ErrorResult); OpResult.ErrorResult errorResult = (OpResult.ErrorResult) opResult; if (KeeperException.Code.OK.intValue() == errorResult.getErr()) { cause = t; } else { cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr())); } } listener.onAbort(cause); }
@Test(timeout = 60000) public void testAbortOpResult() throws Exception { final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final CountDownLatch latch = new CountDownLatch(1); ZKVersionedSetOp versionedSetOp = new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { // no-op } @Override public void onAbort(Throwable t) { exception.set(t); latch.countDown(); } }); KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED); OpResult opResult = new OpResult.ErrorResult(KeeperException.Code.NONODE.intValue()); versionedSetOp.abortOpResult(ke, opResult); latch.await(); assertTrue(exception.get() instanceof KeeperException.NoNodeException); }
/** * Run multiple operations in a transactional manner. Retry before throwing exception */ public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException { RetryCounter retryCounter = retryCounterFactory.create(); Iterable<Op> multiOps = prepareZKMulti(ops); while (true) { try { return zk.multi(multiOps); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case SESSIONEXPIRED: case OPERATIONTIMEOUT: retryOrThrow(retryCounter, e, "multi"); break; default: throw e; } } retryCounter.sleepUntilNextRetry(); retryCounter.useRetry(); } }
@Override public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception { try { final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background"); AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) { trace.commit(); List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null; CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults); client.processBackgroundOperation(operationAndData, event); } }; client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext()); } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
private List<CuratorTransactionResult> forOperationsInForeground(final CuratorMultiTransactionRecord record) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Foreground"); List<OpResult> responseData = RetryLoop.callWithRetry ( client.getZookeeperClient(), new Callable<List<OpResult>>() { @Override public List<OpResult> call() throws Exception { return client.getZooKeeper().multi(record); } } ); trace.commit(); return CuratorTransactionImpl.wrapResults(client, responseData, record); }
@Override public Collection<CuratorTransactionResult> commit() throws Exception { Preconditions.checkState(!isCommitted, "transaction already committed"); isCommitted = true; List<OpResult> resultList = RetryLoop.callWithRetry ( client.getZookeeperClient(), new Callable<List<OpResult>>() { @Override public List<OpResult> call() throws Exception { return doOperation(); } } ); if ( resultList.size() != transaction.metadataSize() ) { throw new IllegalStateException(String.format("Result size (%d) doesn't match input size (%d)", resultList.size(), transaction.metadataSize())); } return wrapResults(client, resultList, transaction); }
/** * Tests if a multiop submitted to a non-leader propagates to the leader properly * (see ZOOKEEPER-1124). * * The test works as follows. It has a client connect to a follower and submit a multiop * to the follower. It then verifies that the multiop successfully gets committed by the leader. * * Without the fix in ZOOKEEPER-1124, this fails with a ConnectionLoss KeeperException. */ @Test public void testMultiToFollower() throws Exception { QuorumUtil qu = new QuorumUtil(1); CountdownWatcher watcher = new CountdownWatcher(); qu.startQuorum(); int index = 1; while(qu.getPeer(index).peer.leader == null) { index++; } ZooKeeper zk = new ZooKeeper( "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), ClientBase.CONNECTION_TIMEOUT, watcher); watcher.waitForConnected(CONNECTION_TIMEOUT); List<OpResult> results = new ArrayList<OpResult>(); results = zk.multi(Arrays.asList( Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) )); zk.getData("/multi0", false, null); zk.getData("/multi1", false, null); zk.getData("/multi2", false, null); zk.close(); qu.tearDown(); }
private List<OpResult> multi(ZooKeeper zk, Iterable<Op> ops) throws KeeperException, InterruptedException { if (useAsync) { final MultiResult res = new MultiResult(); zk.multi(ops, new MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) { synchronized (res) { res.rc = rc; res.results = opResults; res.finished = true; res.notifyAll(); } } }, null); synchronized (res) { while (!res.finished) { res.wait(); } } if (KeeperException.Code.OK.intValue() != res.rc) { KeeperException ke = KeeperException.create(KeeperException.Code.get(res.rc)); throw ke; } return res.results; } else { return zk.multi(ops); } }
private List<OpResult> commit(Transaction txn) throws KeeperException, InterruptedException { if (useAsync) { final MultiResult res = new MultiResult(); txn.commit(new MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) { synchronized (res) { res.rc = rc; res.results = opResults; res.finished = true; res.notifyAll(); } } }, null); synchronized (res) { while (!res.finished) { res.wait(); } } if (KeeperException.Code.OK.intValue() != res.rc) { KeeperException ke = KeeperException.create(KeeperException.Code.get(res.rc)); throw ke; } return res.results; } else { return txn.commit(); } }
private void opEquals(OpResult expected, OpResult value, OpResult near) { assertEquals(value, value); assertFalse(value.equals(new Object())); assertFalse(value.equals(near)); assertFalse(value.equals(value instanceof CreateResult ? new ErrorResult(1) : new CreateResult("nope2"))); assertTrue(value.equals(expected)); }
@Test public void testMulti() throws IOException, KeeperException, InterruptedException { Op createTtl = Op.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_WITH_TTL, 100); Op createTtlSequential = Op.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, 200); Op createNonTtl = Op.create("/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<OpResult> results = zk.multi(Arrays.asList(createTtl, createTtlSequential, createNonTtl)); String sequentialPath = ((OpResult.CreateResult)results.get(1)).getPath(); final AtomicLong fakeElapsed = new AtomicLong(0); ContainerManager containerManager = newContainerManager(fakeElapsed); containerManager.checkContainers(); Assert.assertNotNull("node should not have been deleted yet", zk.exists("/a", false)); Assert.assertNotNull("node should not have been deleted yet", zk.exists(sequentialPath, false)); Assert.assertNotNull("node should never be deleted", zk.exists("/c", false)); fakeElapsed.set(110); containerManager.checkContainers(); Assert.assertNull("node should have been deleted", zk.exists("/a", false)); Assert.assertNotNull("node should not have been deleted yet", zk.exists(sequentialPath, false)); Assert.assertNotNull("node should never be deleted", zk.exists("/c", false)); fakeElapsed.set(210); containerManager.checkContainers(); Assert.assertNull("node should have been deleted", zk.exists("/a", false)); Assert.assertNull("node should have been deleted", zk.exists(sequentialPath, false)); Assert.assertNotNull("node should never be deleted", zk.exists("/c", false)); }
@Test public void ephemeralCreateMultiOpTest() throws KeeperException, InterruptedException, IOException { final ZooKeeper zk = createClient(); String data = "test"; String path = "/ephemeralcreatemultiop"; zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); QuorumZooKeeperServer server = getConnectedServer(zk.getSessionId()); Assert.assertNotNull("unable to find server interlocutor", server); UpgradeableSessionTracker sessionTracker = (UpgradeableSessionTracker)server.getSessionTracker(); Assert.assertFalse("session already global", sessionTracker.isGlobalSession(zk.getSessionId())); List<OpResult> multi = null; try { multi = zk.multi(Arrays.asList( Op.setData(path, data.getBytes(), 0), Op.create(path + "/e", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL), Op.create(path + "/p", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.create(path + "/q", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) )); } catch (KeeperException.SessionExpiredException e) { // the scenario that inspired this unit test Assert.fail("received session expired for a session promotion in a multi-op"); } Assert.assertNotNull(multi); Assert.assertEquals(4, multi.size()); Assert.assertEquals(data, new String(zk.getData(path + "/e", false, null))); Assert.assertEquals(data, new String(zk.getData(path + "/p", false, null))); Assert.assertEquals(data, new String(zk.getData(path + "/q", false, null))); Assert.assertTrue("session not promoted", sessionTracker.isGlobalSession(zk.getSessionId())); }
public void verifyMulti() { List<Op> ops = Arrays.asList( Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.delete("/multi", -1)); zk.multi(ops, this, null); latch_await(); Assert.assertEquals(this.rc, KeeperException.Code.OK.intValue()); Assert.assertTrue(this.opResults.get(0) instanceof OpResult.CreateResult); Assert.assertTrue(this.opResults.get(1) instanceof OpResult.DeleteResult); }
public void verifyMultiFailure_AllErrorResult() { List<Op> ops = Arrays.asList( Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.delete("/nonexist1", -1), Op.setData("/multi", "test".getBytes(), -1)); zk.multi(ops, this, null); latch_await(); Assert.assertTrue(this.opResults.get(0) instanceof OpResult.ErrorResult); Assert.assertTrue(this.opResults.get(1) instanceof OpResult.ErrorResult); Assert.assertTrue(this.opResults.get(2) instanceof OpResult.ErrorResult); }
public void verifyMultiFailure_NoSideEffect() throws KeeperException, InterruptedException { List<Op> ops = Arrays.asList( Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.delete("/nonexist1", -1)); zk.multi(ops, this, null); latch_await(); Assert.assertTrue(this.opResults.get(0) instanceof OpResult.ErrorResult); Assert.assertNull(zk.exists("/multi", false)); }
/** * Run multiple operations in a transactional manner. Retry before throwing exception */ public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException { TraceScope traceScope = null; try { traceScope = Trace.startSpan("RecoverableZookeeper.multi"); RetryCounter retryCounter = retryCounterFactory.create(); Iterable<Op> multiOps = prepareZKMulti(ops); while (true) { try { return checkZk().multi(multiOps); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: case SESSIONEXPIRED: case OPERATIONTIMEOUT: retryOrThrow(retryCounter, e, "multi"); break; default: throw e; } } retryCounter.sleepUntilNextRetry(); } } finally { if (traceScope != null) traceScope.close(); } }
/** * Tests if a multiop submitted to a non-leader propagates to the leader properly * (see ZOOKEEPER-1124). * * The test works as follows. It has a client connect to a follower and submit a multiop * to the follower. It then verifies that the multiop successfully gets committed by the leader. * * Without the fix in ZOOKEEPER-1124, this fails with a ConnectionLoss KeeperException. */ @Test public void testMultiToFollower() throws Exception { QuorumUtil qu = new QuorumUtil(1); CountdownWatcher watcher = new CountdownWatcher(); qu.startQuorum(); int index = 1; while(qu.getPeer(index).peer.leader == null) index++; ZooKeeper zk = new ZooKeeper( "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), ClientBase.CONNECTION_TIMEOUT, watcher); watcher.waitForConnected(CONNECTION_TIMEOUT); List<OpResult> results = new ArrayList<OpResult>(); results = zk.multi(Arrays.asList( Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) )); zk.getData("/multi0", false, null); zk.getData("/multi1", false, null); zk.getData("/multi2", false, null); zk.close(); }