/** * serialize the datatree and session into the file snapshot * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param snapShot the file to store snapshot into */ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot) throws IOException { if (!close) { OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot)); CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32()); //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); serialize(dt,sessions,oa, header); long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); sessOS.flush(); crcOut.close(); sessOS.close(); } }
public static void deserializeSnapshot(DataTree dt,InputArchive ia, Map<Long, Integer> sessions) throws IOException { int count = ia.readInt("count"); while (count > 0) { long id = ia.readLong("id"); int to = ia.readInt("timeout"); sessions.put(id, to); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "loadData --- session in archive: " + id + " with timeout: " + to); } count--; } dt.deserialize(ia, "tree"); }
/** * Return the ACL and stat of the node of the given path. * <p> * A KeeperException with error code KeeperException.NoNode will be thrown * if no node with the given path exists. * * @param path * the given path for the node * @param stat * the stat of the node will be copied to this parameter if * not null. * @return the ACL array of the given node. * @throws InterruptedException If the server transaction is interrupted. * @throws KeeperException If the server signals an error with a non-zero error code. * @throws IllegalArgumentException if an invalid path is specified */ public List<ACL> getACL(final String path, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getACL); GetACLRequest request = new GetACLRequest(); request.setPath(serverPath); GetACLResponse response = new GetACLResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getAcl(); }
/** * serialize the datatree and session into the file snapshot * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param snapShot the file to store snapshot into * @param fsync sync the file immediately after write */ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync) throws IOException { if (!close) { try (CheckedOutputStream crcOut = new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) : new FileOutputStream(snapShot)), new Adler32())) { //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); serialize(dt, sessions, oa, header); long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); crcOut.flush(); } } }
@Override public void commandRun() { if (!isZKServerRunning()) { pw.println(ZK_NOT_SERVING); } else { DataTree dt = zkServer.getZKDatabase().getDataTree(); if (len == FourLetterCommands.wchsCmd) { dt.dumpWatchesSummary(pw); } else if (len == FourLetterCommands.wchpCmd) { dt.dumpWatches(pw, true); } else { dt.dumpWatches(pw, false); } pw.println(); } }
private void attemptAutoCreateDb(File dataDir, File snapDir, Map<Long, Integer> sessions, String priorAutocreateDbValue, String autoCreateValue, long expectedValue) throws IOException { sessions.clear(); System.setProperty(FileTxnSnapLog.ZOOKEEPER_DB_AUTOCREATE, autoCreateValue); FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(dataDir, snapDir); try { long zxid = fileTxnSnapLog.restore(new DataTree(), sessions, new FileTxnSnapLog.PlayBackListener() { @Override public void onTxnLoaded(TxnHeader hdr, Record rec) { // empty by default } }); Assert.assertEquals("unexpected zxid", expectedValue, zxid); } finally { if (priorAutocreateDbValue == null) { System.clearProperty(FileTxnSnapLog.ZOOKEEPER_DB_AUTOCREATE); } else { System.setProperty(FileTxnSnapLog.ZOOKEEPER_DB_AUTOCREATE, priorAutocreateDbValue); } } }
/** * Return the ACL and stat of the node of the given path. * <p> * A KeeperException with error code KeeperException.NoNode will be thrown * if no node with the given path exists. * * @param path * the given path for the node * @param stat * the stat of the node will be copied to this parameter. * @return the ACL array of the given node. * @throws InterruptedException If the server transaction is interrupted. * @throws KeeperException If the server signals an error with a non-zero error code. * @throws IllegalArgumentException if an invalid path is specified */ public List<ACL> getACL(final String path, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getACL); GetACLRequest request = new GetACLRequest(); request.setPath(serverPath); GetACLResponse response = new GetACLResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } DataTree.copyStat(response.getStat(), stat); return response.getAcl(); }
@Override public void commandRun() { if (zkServer == null) { pw.println(ZK_NOT_SERVING); } else { DataTree dt = zkServer.getZKDatabase().getDataTree(); if (len == FourLetterCommands.wchsCmd) { dt.dumpWatchesSummary(pw); } else if (len == FourLetterCommands.wchpCmd) { dt.dumpWatches(pw, true); } else { dt.dumpWatches(pw, false); } pw.println(); } }
/** * For ZOOKEEPER-1046. Verify if cversion and pzxid if incremented * after create/delete failure during restore. */ @Test public void testTxnFailure() throws Exception { long count = 1; File tmpDir = ClientBase.createTmpDir(); FileTxnSnapLog logFile = new FileTxnSnapLog(tmpDir, tmpDir); DataTree dt = new DataTree(); dt.createNode("/test", new byte[0], null, 0, -1, 1, 1); for (count = 1; count <= 3; count++) { dt.createNode("/test/" + count, new byte[0], null, 0, -1, count, System.currentTimeMillis()); } DataNode zk = dt.getNode("/test"); // Make create to fail, then verify cversion. LOG.info("Attempting to create " + "/test/" + (count - 1)); doOp(logFile, OpCode.create, "/test/" + (count - 1), dt, zk); // Make delete fo fail, then verify cversion. // this doesn't happen anymore, we only set the cversion on create // LOG.info("Attempting to delete " + "/test/" + (count + 1)); // doOp(logFile, OpCode.delete, "/test/" + (count + 1), dt, zk); }
public static void deserializeSnapshot(DataTree dt,InputArchive ia, Map<Long, Integer> sessions) throws IOException { // 读取session个数 int count = ia.readInt("count"); while (count > 0) { // session id long id = ia.readLong("id"); // session超时时间 int to = ia.readInt("timeout"); // 解析session集合 sessions.put(id, to); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "loadData --- session in archive: " + id + " with timeout: " + to); } count--; } // 反序列化tree dt.deserialize(ia, "tree"); }
/** * convert a given old datanode to new datanode * @param dt the new datatree * @param parent the parent of the datanode to be constructed * @param oldDataNode the old datanode * @return the new datanode */ private DataNode convertDataNode(DataTree dt, DataNode parent, DataNodeV1 oldDataNode) { StatPersisted stat = convertStat(oldDataNode.stat); DataNode dataNode = new DataNode(parent, oldDataNode.data, dt.getACL(oldDataNode), stat); dataNode.setChildren(oldDataNode.children); return dataNode; }
/** * recurse through the old datatree and construct the * new data tree * @param dataTree the new datatree to be constructed * @param path the path to start with */ private void recurseThroughDataTree(DataTree dataTree, String path) { if (path == null) return; DataNodeV1 oldDataNode = oldDataTree.getNode(path); HashSet<String> children = oldDataNode.children; DataNode parent = null; if ("".equals(path)) { parent = null; } else { int lastSlash = path.lastIndexOf('/'); String parentPath = path.substring(0, lastSlash); parent = dataTree.getNode(parentPath); } DataNode thisDatNode = convertDataNode(dataTree, parent, oldDataNode); dataTree.addDataNode(path, thisDatNode); if (children == null || children.size() == 0) { return; } else { for (String child: children) { recurseThroughDataTree(dataTree, path + "/" +child); } } }
private DataTree convertThisSnapShot() throws IOException { // create a datatree DataTree dataTree = new DataTree(); DataNodeV1 oldDataNode = oldDataTree.getNode(""); if (oldDataNode == null) { //should never happen LOG.error("Upgrading from an empty snapshot."); } recurseThroughDataTree(dataTree, ""); dataTree.lastProcessedZxid = oldDataTree.lastProcessedZxid; return dataTree; }
/** * run the upgrade * @throws IOException */ public void runUpgrade() throws IOException { if (!dataDir.exists()) { throw new IOException(dataDir + " does not exist"); } if (!snapShotDir.exists()) { throw new IOException(snapShotDir + " does not exist"); } // create the bkup directorya createAllDirs(); //copy all the files for backup try { copyFiles(dataDir, bkupdataDir, "log"); copyFiles(snapShotDir, bkupsnapShotDir, "snapshot"); } catch(IOException io) { LOG.error("Failed in backing up."); throw io; } //evrything is backed up // read old database and create // an old snapshot UpgradeSnapShotV1 upgrade = new UpgradeSnapShotV1(bkupdataDir, bkupsnapShotDir); LOG.info("Creating new data tree"); DataTree dt = upgrade.getNewDataTree(); FileTxnSnapLog filesnapLog = new FileTxnSnapLog(dataDir, snapShotDir); LOG.info("snapshotting the new datatree"); filesnapLog.save(dt, upgrade.getSessionWithTimeOuts()); //done saving. LOG.info("Upgrade is complete"); }
/** * deserialize the datatree from an inputarchive * @param dt the datatree to be serialized into * @param sessions the sessions to be filled up * @param ia the input archive to restore from * @throws IOException */ public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException { FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); if (header.getMagic() != SNAP_MAGIC) { throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC); } SerializeUtils.deserializeSnapshot(dt,ia,sessions); }
/** * serialize the datatree and sessions * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param oa the output archive to serialize into * @param header the header of this snapshot * @throws IOException */ protected void serialize(DataTree dt,Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException { // this is really a programmatic error and not something that can // happen at runtime if(header==null) throw new IllegalStateException( "Snapshot's not open for writing: uninitialized header"); header.serialize(oa, "fileheader"); SerializeUtils.serializeSnapshot(dt,oa,sessions); }
/** * save the datatree and the sessions into a snapshot * @param dataTree the datatree to be serialized onto disk * @param sessionsWithTimeouts the sesssion timeouts to be * serialized onto disk * @throws IOException */ public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts) throws IOException { long lastZxid = dataTree.lastProcessedZxid; File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); }
public static void serializeSnapshot(DataTree dt,OutputArchive oa, Map<Long, Integer> sessions) throws IOException { HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions); oa.writeInt(sessSnap.size(), "count"); for (Entry<Long, Integer> entry : sessSnap.entrySet()) { oa.writeLong(entry.getKey().longValue(), "id"); oa.writeInt(entry.getValue().intValue(), "timeout"); } dt.serialize(oa, "tree"); }
/** * Return the data and the stat of the node of the given path. * <p> * If the watch is non-null and the call is successful (no exception is * thrown), a watch will be left on the node with the given path. The watch * will be triggered by a successful operation that sets data on the node, or * deletes the node. * <p> * A KeeperException with error code KeeperException.NoNode will be thrown * if no node with the given path exists. * * @param path the given path * @param watcher explicit watcher * @param stat the stat of the node * @return the data of the node * @throws KeeperException If the server signals an error with a non-zero error code * @throws InterruptedException If the server transaction is interrupted. * @throws IllegalArgumentException if an invalid path is specified */ public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { //注册watcher到中 wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); //III: ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }
/** * For the given znode path return the stat and children list. * <p> * If the watch is non-null and the call is successful (no exception is thrown), * a watch will be left on the node with the given path. The watch willbe * triggered by a successful operation that deletes the node of the given * path or creates/delete a child under the node. * <p> * The list of children returned is not sorted and no guarantee is provided * as to its natural or lexical order. * <p> * A KeeperException with error code KeeperException.NoNode will be thrown * if no node with the given path exists. * * @since 3.3.0 * * @param path * @param watcher explicit watcher * @param stat stat of the znode designated by path * @return an unordered array of children of the node with the given path * @throws InterruptedException If the server transaction is interrupted. * @throws KeeperException If the server signals an error with a non-zero error code. * @throws IllegalArgumentException if an invalid path is specified */ public List<String> getChildren(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new ChildWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getChildren2); GetChildren2Request request = new GetChildren2Request(); request.setPath(serverPath); request.setWatch(watcher != null); GetChildren2Response response = new GetChildren2Response(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getChildren(); }
/** * For ZOOKEEPER-1755 - Test race condition when taking dumpEphemerals and * removing the session related ephemerals from DataTree structure */ @Test(timeout = 60000) public void testDumpEphemerals() throws Exception { int count = 1000; long session = 1000; long zxid = 2000; final DataTree dataTree = new DataTree(); LOG.info("Create {} zkclient sessions and its ephemeral nodes", count); createEphemeralNode(session, dataTree, count); final AtomicBoolean exceptionDuringDumpEphemerals = new AtomicBoolean( false); final AtomicBoolean running = new AtomicBoolean(true); Thread thread = new Thread() { public void run() { PrintWriter pwriter = new PrintWriter(new StringWriter()); try { while (running.get()) { dataTree.dumpEphemerals(pwriter); } } catch (Exception e) { LOG.error("Received exception while dumpEphemerals!", e); exceptionDuringDumpEphemerals.set(true); } }; }; thread.start(); LOG.debug("Killing {} zkclient sessions and its ephemeral nodes", count); killZkClientSession(session, zxid, dataTree, count); running.set(false); thread.join(); Assert.assertFalse("Should have got exception while dumpEphemerals!", exceptionDuringDumpEphemerals.get()); }