public void processRequest(Request request) { if (!finished) { // Before sending the request, check if the request requires a // global session and what we have is a local session. If so do // an upgrade. Request upgradeRequest = null; try { upgradeRequest = zks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.info("Error creating upgrade request", ke); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { queuedRequests.add(upgradeRequest); } queuedRequests.add(request); } }
/** * Simply queue the request, which will be processed in FIFO order. */ public void processRequest(Request request) { if (!finished) { Request upgradeRequest = null; try { upgradeRequest = zks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.info("Error creating upgrade request", ke); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { queuedRequests.add(upgradeRequest); } queuedRequests.add(request); } }
@Override public void processRequest(Request request) throws RequestProcessorException { // Check if this is a local session and we are trying to create // an ephemeral node, in which case we upgrade the session Request upgradeRequest = null; try { upgradeRequest = lzks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { LOG.debug("Updating header"); request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.info("Error creating upgrade request " + ke.getMessage()); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { nextProcessor.processRequest(upgradeRequest); } nextProcessor.processRequest(request); }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error); ErrorTxn txn = new ErrorTxn(1); byte[] buf = Util.marshallTxnEntry(hdr, txn); Request req = new Request(null, 1, 1, ZooDefs.OpCode.error, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; logFactory.append(req); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }
@Test public void testPRequest() throws Exception { pLatch = new CountDownLatch(1); processor = new PrepRequestProcessor(zks, new MyRequestProcessor()); Request foo = new Request(null, 1l, 1, OpCode.create, ByteBuffer.allocate(3), null); processor.pRequest(foo); Assert.assertEquals("Request should have marshalling error", new ErrorTxn(KeeperException.Code.MARSHALLINGERROR.intValue()), outcome.txn); Assert.assertTrue("request hasn't been processed in chain", pLatch.await(5, TimeUnit.SECONDS)); }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false); long zxid = ZxidUtils.makeZxid(3, 3); logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), new ErrorTxn(1), zxid)); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { TestUtils.deleteFileRecursively(tmpDir); } }
@Test public void testPRequest() throws Exception { pLatch = new CountDownLatch(1); processor = new PrepRequestProcessor(zks, new MyRequestProcessor()); Request foo = new Request(null, 1l, 1, OpCode.create, ByteBuffer.allocate(3), null); processor.pRequest(foo); Assert.assertEquals("Request should have marshalling error", new ErrorTxn(KeeperException.Code.MARSHALLINGERROR.intValue()), outcome.getTxn()); Assert.assertTrue("request hasn't been processed in chain", pLatch.await(5, TimeUnit.SECONDS)); }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error); ErrorTxn txn = new ErrorTxn(1); byte[] buf = Util.marshallTxnEntry(hdr, txn); Request req = new Request(null, 1, 1, ZooDefs.OpCode.error, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; logFactory.append(req); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = QuorumPeer.testingQuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), new ErrorTxn(1), zxid)); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir"); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error); ErrorTxn txn = new ErrorTxn(1); byte[] buf = Util.marshallTxnEntry(hdr, txn); Request req = new Request(null, 1, 1, ZooDefs.OpCode.error, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; logFactory.append(req); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }
@SuppressWarnings("unchecked") public ProcessTxnResult processTxn(TxnHeader header, Record txn) { ProcessTxnResult rc = new ProcessTxnResult(); String debug = ""; try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; if (rc.zxid > lastProcessedZxid) { lastProcessedZxid = rc.zxid; } switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; debug = "Create transaction for " + createTxn.getPath(); createNode(createTxn.getPath(), createTxn.getData(), createTxn .getAcl(), createTxn.getEphemeral() ? header .getClientId() : 0, header.getZxid(), header.getTime()); rc.path = createTxn.getPath(); break; case OpCode.delete: DeleteTxn deleteTxn = (DeleteTxn) txn; debug = "Delete transaction for " + deleteTxn.getPath(); deleteNode(deleteTxn.getPath()); break; case OpCode.setData: SetDataTxn setDataTxn = (SetDataTxn) txn; debug = "Set data for transaction for " + setDataTxn.getPath(); rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(), header.getZxid(), header .getTime()); break; case OpCode.setACL: SetACLTxn setACLTxn = (SetACLTxn) txn; debug = "Set ACL for transaction for " + setACLTxn.getPath(); rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); break; case OpCode.closeSession: killSession(header.getClientId()); break; case OpCode.error: ErrorTxn errTxn = (ErrorTxn) txn; rc.err = errTxn.getErr(); break; } } catch (KeeperException e) { // These are expected errors since we take a lazy snapshot if (initialized || (e.code() != Code.NONODE && e.code() != Code.NODEEXISTS)) { LOG.warn("Failed:" + debug, e); } } return rc; }
@Override public void processRequest(Request request) { Assert.assertEquals("Request should have marshalling error", new ErrorTxn(Code.MARSHALLINGERROR.intValue()), request.txn); }
public ProcessTxnResult processTxn(TxnHeader header, Record txn) { ProcessTxnResult rc = new ProcessTxnResult(); String debug = ""; try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; if (rc.zxid > lastProcessedZxid) { lastProcessedZxid = rc.zxid; } switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; debug = "Create transaction for " + createTxn.getPath(); rc.path = createTxn.getPath(); createNode( createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime()); break; case OpCode.delete: DeleteTxn deleteTxn = (DeleteTxn) txn; debug = "Delete transaction for " + deleteTxn.getPath(); rc.path = deleteTxn.getPath(); deleteNode(deleteTxn.getPath(), header.getZxid()); break; case OpCode.setData: SetDataTxn setDataTxn = (SetDataTxn) txn; debug = "Set data for transaction for " + setDataTxn.getPath(); rc.stat = setData(setDataTxn.getPath(), setDataTxn .getData(), setDataTxn.getVersion(), header .getZxid(), header.getTime()); break; case OpCode.setACL: SetACLTxn setACLTxn = (SetACLTxn) txn; debug = "Set ACL for transaction for " + setACLTxn.getPath(); rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); break; case OpCode.closeSession: killSession(header.getClientId(), header.getZxid()); break; case OpCode.error: ErrorTxn errTxn = (ErrorTxn) txn; rc.err = errTxn.getErr(); break; } } catch (KeeperException e) { LOG.debug("Failed: " + debug, e); rc.err = e.code().intValue(); } return rc; }
@Override public void processRequest(Request request) { Assert.assertEquals("Request should have marshalling error", new ErrorTxn(Code.MARSHALLINGERROR.intValue()), request.txn); testEnd.countDown(); }