/** * The asynchronous version of create. * * @see #create(String, byte[], List, CreateMode) */ public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); EphemeralType.validateTTL(createMode, -1); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
/** * The Asynchronous version of create. The request doesn't actually until * the asynchronous callback is called. * * @see #create(String, byte[], List, CreateMode) */ public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
/** * The asynchronous version of create. * * @see #create(String, byte[], List, CreateMode) */ public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
public void testCreate() { class CreateCallback implements StringCallback { @Override public void processResult(int rc, String path, Object ctx, String name) { LOG.info("code->{} path->{}", rc, path); switch (Code.get(rc)) { case CONNECTIONLOSS: // TODO re-create break; case OK: break; case NODEEXISTS: break; default: LOG.error("error code->{} path->{}", rc, path); } } } if (zk != null) zk.create("/test", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new CreateCallback(), null); }
@Test public void testSync() throws Exception { try { LOG.info("Starting ZK:" + (new Date()).toString()); opsCount = new CountDownLatch(limit); ZooKeeper zk = createClient(); LOG.info("Beginning test:" + (new Date()).toString()); for(int i = 0; i < 50; i++) zk.create("/test" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (StringCallback)this, results); for(int i = 50; i < 100; i++) { zk.create("/test" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (Create2Callback)this, results); } zk.sync("/test", this, results); for(int i = 0; i < 100; i++) zk.delete("/test" + i, 0, this, results); for(int i = 0; i < 100; i++) zk.getChildren("/", new NullWatcher(), (ChildrenCallback)this, results); for(int i = 0; i < 100; i++) zk.getChildren("/", new NullWatcher(), (Children2Callback)this, results); LOG.info("Submitted all operations:" + (new Date()).toString()); if(!opsCount.await(10000, TimeUnit.MILLISECONDS)) Assert.fail("Haven't received all confirmations" + opsCount.getCount()); for(int i = 0; i < limit ; i++){ Assert.assertEquals(0, (int) results.get(i)); } } catch (IOException e) { System.out.println(e.toString()); } }
/** * Pre-creating bookkeeper metadata path in zookeeper. */ private void prepareBookKeeperEnv() throws IOException { // create bookie available path in zookeeper if it doesn't exists final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH, BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT); final CountDownLatch zkPathLatch = new CountDownLatch(1); final AtomicBoolean success = new AtomicBoolean(false); StringCallback callback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { if (KeeperException.Code.OK.intValue() == rc || KeeperException.Code.NODEEXISTS.intValue() == rc) { LOG.info("Successfully created bookie available path : " + zkAvailablePath); success.set(true); } else { KeeperException.Code code = KeeperException.Code.get(rc); LOG.error("Error : " + KeeperException.create(code, path).getMessage() + ", failed to create bookie available path : " + zkAvailablePath); } zkPathLatch.countDown(); } }; ZkUtils.asyncCreateFullPathOptimistic(zkc, zkAvailablePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null); try { if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS) || !success.get()) { throw new IOException("Couldn't create bookie available path :" + zkAvailablePath + ", timed out " + zkc.getSessionTimeout() + " millis"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException( "Interrupted when creating the bookie available path : " + zkAvailablePath, e); } }
@Override public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode, final StringCallback cb, final Object ctx) { if (stopped) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); return; } final Set<Watcher> toNotifyCreate = Sets.newHashSet(); toNotifyCreate.addAll(watchers.get(path)); final Set<Watcher> toNotifyParent = Sets.newHashSet(); final String parent = path.substring(0, path.lastIndexOf("/")); if (!parent.isEmpty()) { toNotifyParent.addAll(watchers.get(parent)); } watchers.removeAll(path); executor.execute(() -> { mutex.lock(); if (getProgrammedFailStatus()) { mutex.unlock(); cb.processResult(failReturnCode.intValue(), path, ctx, null); } else if (stopped) { mutex.unlock(); cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); } else if (tree.containsKey(path)) { mutex.unlock(); cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null); } else if (!parent.isEmpty() && !tree.containsKey(parent)) { mutex.unlock(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); } else { tree.put(path, Pair.create(data, 0)); mutex.unlock(); cb.processResult(0, path, ctx, null); toNotifyCreate.forEach( watcher -> watcher.process( new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, path))); toNotifyParent.forEach( watcher -> watcher.process( new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent))); } }); }
/** * Pre-creating bookkeeper metadata path in zookeeper. */ private void prepareBookKeeperEnv() throws IOException { // create bookie available path in zookeeper if it doesn't exists final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH, BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT); final CountDownLatch zkPathLatch = new CountDownLatch(1); final AtomicBoolean success = new AtomicBoolean(false); StringCallback callback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { if (KeeperException.Code.OK.intValue() == rc || KeeperException.Code.NODEEXISTS.intValue() == rc) { LOG.info("Successfully created bookie available path : " + zkAvailablePath); success.set(true); } else { KeeperException.Code code = KeeperException.Code.get(rc); LOG.error("Error : " + KeeperException.create(code, path).getMessage() + ", failed to create bookie available path : " + zkAvailablePath); } zkPathLatch.countDown(); } }; ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null); try { if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS) || !success.get()) { throw new IOException("Couldn't create bookie available path :" + zkAvailablePath + ", timed out " + zkc.getSessionTimeout() + " millis"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException( "Interrupted when creating the bookie available path : " + zkAvailablePath, e); } }
/** * Visits the subtree with root as given path and calls the passed callback with each znode * found during the search. It performs a depth-first, pre-order traversal of the tree. * <p> * <b>Important:</b> This is <i>not an atomic snapshot</i> of the tree ever, but the * state as it exists across multiple RPCs from zkClient to the ensemble. * For practical purposes, it is suggested to bring the clients to the ensemble * down (i.e. prevent writes to pathRoot) to 'simulate' a snapshot behavior. */ public static void visitSubTreeDFS(ZooKeeper zk, final String path, boolean watch, StringCallback cb) throws KeeperException, InterruptedException { PathUtils.validatePath(path); zk.getData(path, watch, null); cb.processResult(Code.OK.intValue(), path, null, path); visitSubTreeDFSHelper(zk, path, watch, cb); }