/** * get the last zxid that was logged in the transaction logs * @return the last zxid logged in the transaction logs */ public long getLastLoggedZxid() { File[] files = getLogFiles(logDir.listFiles(), 0); long maxLog=files.length>0? Util.getZxidFromName(files[files.length-1].getName(),"log"):-1; // if a log file is more recent we must scan it to find // the highest zxid long zxid = maxLog; TxnIterator itr = null; try { FileTxnLog txn = new FileTxnLog(logDir); itr = txn.read(maxLog); while (true) { if(!itr.next()) break; TxnHeader hdr = itr.getHeader(); zxid = hdr.getZxid(); } } catch (IOException e) { LOG.warn("Unexpected exception", e); } finally { close(itr); } return zxid; }
@Test public void testTruncationStreamReset() throws Exception { File tmpdir = ClientBase.createTmpDir(); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir); ZKDatabase zkdb = new ZKDatabase(snaplog); for (int i = 1; i <= 100; i++) { append(zkdb, i); } zkdb.truncateLog(1); append(zkdb, 200); zkdb.close(); // verify that the truncation and subsequent append were processed // correctly FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2")); TxnIterator iter = txnlog.read(1); TxnHeader hdr = iter.getHeader(); Record txn = iter.getTxn(); Assert.assertEquals(1, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.next(); hdr = iter.getHeader(); txn = iter.getTxn(); Assert.assertEquals(200, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.close(); ClientBase.recursiveDelete(tmpdir); }
@Test public void testTruncationStreamReset() throws Exception { File tmpdir = ClientBase.createTmpDir(); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir); ZKDatabase zkdb = new ZKDatabase(snaplog); // make sure to snapshot, so that we have something there when // truncateLog reloads the db snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false); for (int i = 1; i <= 100; i++) { append(zkdb, i); } zkdb.truncateLog(1); append(zkdb, 200); zkdb.close(); // verify that the truncation and subsequent append were processed // correctly FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2")); TxnIterator iter = txnlog.read(1); TxnHeader hdr = iter.getHeader(); Record txn = iter.getTxn(); Assert.assertEquals(1, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.next(); hdr = iter.getHeader(); txn = iter.getTxn(); Assert.assertEquals(200, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.close(); ClientBase.recursiveDelete(tmpdir); }
/** * this function restores the server * database after reading from the * snapshots and transaction logs * @param dt the datatree to be restored * @param sessions the sessions to be restored * @param listener the playback listener to run on the * database restoration * @return the highest zxid restored * @throws IOException */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) > " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } return highestZxid; }
@Test public void testTruncationStreamReset() throws Exception { File tmpdir = ClientBase.createTmpDir(); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir); ZKDatabase zkdb = new ZKDatabase(snaplog); for (int i = 1; i <= 100; i++) { append(zkdb, i); } zkdb.truncateLog(1); append(zkdb, 200); zkdb.close(); // verify that the truncation and subsequent append were processed // correctly FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2")); TxnIterator iter = txnlog.read(1); TxnHeader hdr = iter.getHeader(); Record txn = iter.getTxn(); Assert.assertEquals(1, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.next(); hdr = iter.getHeader(); txn = iter.getTxn(); Assert.assertEquals(200, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); }
/** * this function restores the server * database after reading from the * snapshots and transaction logs * @param dt the datatree to be restored * @param sessions the sessions to be restored * @param listener the playback listener to run on the * database restoration * @return the highest zxid restored * @throws IOException */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) > " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage()); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } return highestZxid; }
private void close(TxnIterator itr) { if (itr != null) { try { itr.close(); } catch (IOException ioe) { LOG.warn("Error closing file iterator", ioe); } } }
/** * this function restores the server * database after reading from the * snapshots and transaction logs * @param dt the datatree to be restored * @param sessions the sessions to be restored * @param listener the playback listener to run on the * database restoration * @return the highest zxid restored * @throws IOException */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) > " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage()); } if (!itr.next()) break; } return highestZxid; }
/** * this function restores[修复] the server * database after reading from the * snapshots and transaction logs * @param dt the datatree to be restored * @param sessions the sessions to be restored * @param listener the playback listener to run on the * database restoration * @return the highest zxid restored * @throws IOException */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(higestZxid) > {}(next log) for type {}", new Object[] { highestZxid, hdr.getZxid(), hdr.getType() }); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; }
/** * test that all transactions from the Log are loaded, and only once * @throws Exception an exception might be thrown here */ @Test public void testLoad() throws Exception { // setup a single server cluster File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); // generate some transactions that will get logged try { for (int i = 0; i< NUM_MESSAGES; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // now verify that the FileTxnLog reads every transaction only once File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); FileTxnLog txnLog = new FileTxnLog(logDir); TxnIterator itr = txnLog.read(0); long expectedZxid = 0; long lastZxid = 0; TxnHeader hdr; do { hdr = itr.getHeader(); expectedZxid++; Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid()); Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid)); lastZxid = hdr.getZxid(); }while(itr.next()); Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS)); zks.shutdown(); }
/** * this function restores the server * database after reading from the * snapshots and transaction logs * @param dt the datatree to be restored * @param sessions the sessions to be restored * @param listener the playback listener to run on the * database restoration * @return the highest zxid restored * @throws IOException */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { long deserializeResult = snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); boolean trustEmptyDB; File initFile = new File(dataDir.getParent(), "initialize"); if (Files.deleteIfExists(initFile.toPath())) { LOG.info("Initialize file found, an empty database will not block voting participation"); trustEmptyDB = true; } else { trustEmptyDB = autoCreateDB; } if (-1L == deserializeResult) { /* this means that we couldn't find any snapshot, so we need to * initialize an empty database (reported in ZOOKEEPER-2325) */ if (txnLog.getLastLoggedZxid() != -1) { throw new IOException( "No snapshot found, but there are log entries. " + "Something is broken!"); } if (trustEmptyDB) { /* TODO: (br33d) we should either put a ConcurrentHashMap on restore() * or use Map on save() */ save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false); /* return a zxid of 0, since we know the database is empty */ return 0L; } else { /* return a zxid of -1, since we are possibly missing data */ LOG.warn("Unexpected empty data tree, setting zxid to -1"); dt.lastProcessedZxid = -1L; return -1L; } } TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; }
public TxnLogProposalIterator(TxnIterator itr) { if (itr != null) { this.itr = itr; hasNext = (itr.getHeader() != null); } }
/** * Get proposals from txnlog. Only packet part of proposal is populated. * * @param startZxid the starting zxid of the proposal * @param sizeLimit maximum on-disk size of txnlog to fetch * 0 is unlimited, negative value means disable. * @return list of proposal (request part of each proposal is null) */ public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit) { if (sizeLimit < 0) { LOG.debug("Negative size limit - retrieving proposal via txnlog is disabled"); return TxnLogProposalIterator.EMPTY_ITERATOR; } TxnIterator itr = null; try { itr = snapLog.readTxnLog(startZxid, false); // If we cannot guarantee that this is strictly the starting txn // after a given zxid, we should fail. if ((itr.getHeader() != null) && (itr.getHeader().getZxid() > startZxid)) { LOG.warn("Unable to find proposals from txnlog for zxid: " + startZxid); itr.close(); return TxnLogProposalIterator.EMPTY_ITERATOR; } if (sizeLimit > 0) { long txnSize = itr.getStorageSize(); if (txnSize > sizeLimit) { LOG.info("Txnlog size: " + txnSize + " exceeds sizeLimit: " + sizeLimit); itr.close(); return TxnLogProposalIterator.EMPTY_ITERATOR; } } } catch (IOException e) { LOG.error("Unable to read txnlog from disk", e); try { if (itr != null) { itr.close(); } } catch (IOException ioe) { LOG.warn("Error closing file iterator", ioe); } return TxnLogProposalIterator.EMPTY_ITERATOR; } return new TxnLogProposalIterator(itr); }
/** * test that all transactions from the Log are loaded, and only once * @throws Exception an exception might be thrown here */ @Test public void testLoad() throws Exception { final String hostPort = HOST + PortAssignment.unique(); // setup a single server cluster File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(hostPort.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(hostPort,CONNECTION_TIMEOUT)); ZooKeeper zk = ClientBase.createZKClient(hostPort); // generate some transactions that will get logged try { for (int i = 0; i< NUM_MESSAGES; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(hostPort, CONNECTION_TIMEOUT)); // now verify that the FileTxnLog reads every transaction only once File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); FileTxnLog txnLog = new FileTxnLog(logDir); TxnIterator itr = txnLog.read(0); // Check that storage space return some value FileTxnIterator fileItr = (FileTxnIterator) itr; long storageSize = fileItr.getStorageSize(); LOG.info("Txnlog size: " + storageSize + " bytes"); Assert.assertTrue("Storage size is greater than zero ", (storageSize > 0)); long expectedZxid = 0; long lastZxid = 0; TxnHeader hdr; do { hdr = itr.getHeader(); expectedZxid++; Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid()); Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid)); lastZxid = hdr.getZxid(); }while(itr.next()); Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS)); zks.shutdown(); }
/** * test that we fail to load txnlog of a request zxid that is older * than what exist on disk * @throws Exception an exception might be thrown here */ @Test public void testLoadFailure() throws Exception { final String hostPort = HOST + PortAssignment.unique(); // setup a single server cluster File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); // So we have at least 4 logs SyncRequestProcessor.setSnapCount(50); final int PORT = Integer.parseInt(hostPort.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(hostPort,CONNECTION_TIMEOUT)); ZooKeeper zk = ClientBase.createZKClient(hostPort); // generate some transactions that will get logged try { for (int i = 0; i< NUM_MESSAGES; i++) { zk.create("/data-", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); } } finally { zk.close(); } f.shutdown(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(hostPort, CONNECTION_TIMEOUT)); File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); File[] logFiles = FileTxnLog.getLogFiles(logDir.listFiles(), 0); // Verify that we have at least 4 txnlog Assert.assertTrue(logFiles.length > 4); // Delete the first log file, so we will fail to read it back from disk Assert.assertTrue("delete the first log file", logFiles[0].delete()); // Find zxid for the second log long secondStartZxid = Util.getZxidFromName(logFiles[1].getName(), "log"); FileTxnLog txnLog = new FileTxnLog(logDir); TxnIterator itr = txnLog.read(1, false); // Oldest log is already remove, so this should point to the start of // of zxid on the second log Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); itr = txnLog.read(secondStartZxid, false); Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); Assert.assertTrue(itr.next()); // Trying to get a second txn on second txnlog give us the // the start of second log, since the first one is removed long nextZxid = itr.getHeader().getZxid(); itr = txnLog.read(nextZxid, false); Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); // Trying to get a first txn on the third give us the // the start of second log, since the first one is removed long thirdStartZxid = Util.getZxidFromName(logFiles[2].getName(), "log"); itr = txnLog.read(thirdStartZxid, false); Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); Assert.assertTrue(itr.next()); nextZxid = itr.getHeader().getZxid(); itr = txnLog.read(nextZxid, false); Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); }
/** * this function restores the server * database after reading from the * snapshots and transaction logs * @param dt the datatree to be restored * @param sessions the sessions to be restored * @param listener the playback listener to run on the * database restoration * @return the highest zxid restored * @throws IOException */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(higestZxid) > {}(next log) for type {}", new Object[] { highestZxid, hdr.getZxid(), hdr.getType() }); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; }
/** * test that all transactions from the Log are loaded, and only once * @throws Exception an exception might be thrown here */ @Test public void testLoad() throws Exception { // setup a single server cluster File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); // generate some transactions that will get logged try { for (int i = 0; i< NUM_MESSAGES; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // now verify that the FileTxnLog reads every transaction only once File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); FileTxnLog txnLog = new FileTxnLog(logDir); TxnIterator itr = txnLog.read(0); long expectedZxid = 0; long lastZxid = 0; TxnHeader hdr; do { hdr = itr.getHeader(); expectedZxid++; Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid()); Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid)); lastZxid = hdr.getZxid(); }while(itr.next()); Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS)); }
/** * this function restores the server * database after reading from the * snapshots and transaction logs * @param dt the datatree to be restored * @param sessions the sessions to be restored * @param listener the playback listener to run on the * database restoration * @return the highest zxid restored * @throws IOException */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 内存快照文件反序列化为内存数据库 snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // iterator points to // the first valid txn when initialized // 事务头 hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } // 比较内存快照的zxid和事务日志的zxid, 得到大的zxid if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(higestZxid) > {}(next log) for type {}", new Object[] { highestZxid, hdr.getZxid(), hdr.getType() }); } else { highestZxid = hdr.getZxid(); } try { // 从事务日志恢复内存数据库 processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } // 触发事务加载监听事件 listener.onTxnLoaded(hdr, itr.getTxn()); // 处理下一条事务日志 if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; }