/** * Set up the request processors for an Observer: * firstProcesor->commitProcessor->finalProcessor */ @Override protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getObserver())); syncProcessor.start(); }
@Test public void testPRequest() throws Exception { File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); zks.sessionTracker = new MySessionTracker(); PrepRequestProcessor processor = new PrepRequestProcessor(zks, new MyRequestProcessor()); Request foo = new Request(null, 1l, 1, OpCode.create, ByteBuffer.allocate(3), null); processor.pRequest(foo); }
/** * Set up the request processors for an Observer: * firstProcesor->commitProcessor->finalProcessor */ @Override protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
@Test public void testPRequest() throws Exception { File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); zks.sessionTracker = new MySessionTracker(); PrepRequestProcessor processor = new PrepRequestProcessor(zks, new MyRequestProcessor()); Request foo = new Request(null, 1l, 1, OpCode.create, ByteBuffer.allocate(3), null); processor.pRequest(foo); testEnd.await(5, java.util.concurrent.TimeUnit.SECONDS); f.shutdown(); zks.shutdown(); }
public void testDisconnectedAddAuth() throws Exception { File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(1000); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); NIOServerCnxn.Factory f = new NIOServerCnxn.Factory( new InetSocketAddress(PORT)); f.startup(zks); LOG.info("starting up the zookeeper server .. waiting"); assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); try { zk.addAuthInfo("digest", "pat:test".getBytes()); zk.setACL("/", Ids.CREATOR_ALL_ACL, -1); } finally { zk.close(); } f.shutdown(); assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT)); }
@Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); }
/** * Set up the request processors for an Observer: * firstProcesor->commitProcessor->finalProcessor */ @Override protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { this.zks = zks; this.nextProcessor = nextProcessor; AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); syncProcessor = new SyncRequestProcessor(zks, ackProcessor); }
/** * test the purge * @throws Exception an exception might be thrown here */ @Test public void testPurge() throws Exception { tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); try { for (int i = 0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); zks.getTxnLogFactory().close(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // now corrupt the snapshot PurgeTxnLog.purge(tmpDir, tmpDir, 3); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpDir, tmpDir); List<File> listLogs = snaplog.findNRecentSnapshots(4); int numSnaps = 0; for (File ff: listLogs) { if (ff.getName().startsWith("snapshot")) { numSnaps++; } } Assert.assertTrue("exactly 3 snapshots ", (numSnaps == 3)); snaplog.close(); zks.shutdown(); }
/** * test the snapshot * @throws Exception an exception could be expected */ @Test public void testSnapshot() throws Exception { File snapDir = new File(testData, "invalidsnap"); ZooKeeperServer zks = new ZooKeeperServer(snapDir, snapDir, 3000); SyncRequestProcessor.setSnapCount(1000); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); LOG.info("starting up the zookeeper server .. waiting"); Assert.assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this); try { // we know this from the data files // this node is the last node in the snapshot Assert.assertTrue(zk.exists("/9/9/8", false) != null); } finally { zk.close(); } f.shutdown(); zks.shutdown(); Assert.assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT)); }
@Test public void testDisconnectedAddAuth() throws Exception { File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(1000); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); try { LOG.info("starting up the zookeeper server .. waiting"); Assert.assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); try { zk.addAuthInfo("digest", "pat:test".getBytes()); zk.setACL("/", Ids.CREATOR_ALL_ACL, -1); } finally { zk.close(); } } finally { f.shutdown(); zks.shutdown(); Assert.assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT)); } }
/** * test the upgrade * @throws Exception */ @Test public void testUpgrade() throws Exception { File upgradeDir = new File(testData, "upgrade"); UpgradeMain upgrade = new UpgradeMain(upgradeDir, upgradeDir); upgrade.runUpgrade(); ZooKeeperServer zks = new ZooKeeperServer(upgradeDir, upgradeDir, 3000); SyncRequestProcessor.setSnapCount(1000); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); LOG.info("starting up the zookeeper server .. waiting"); Assert.assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); Stat stat = zk.exists("/", false); List<String> children = zk.getChildren("/", false); Collections.sort(children); for (int i = 0; i < 10; i++) { Assert.assertTrue("data tree sanity check", ("test-" + i).equals(children.get(i))); } //try creating one node zk.create("/upgrade", "upgrade".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // check if its there if (zk.exists("/upgrade", false) == null) { Assert.assertTrue(false); } zk.close(); // bring down the server f.shutdown(); Assert.assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT)); }
/** * test the purge * @throws Exception an exception might be thrown here */ @Test public void testPurge() throws Exception { tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = ClientBase.createZKClient(HOSTPORT); try { for (int i = 0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); zks.getTxnLogFactory().close(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // now corrupt the snapshot PurgeTxnLog.purge(tmpDir, tmpDir, 3); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpDir, tmpDir); List<File> listLogs = snaplog.findNRecentSnapshots(4); int numSnaps = 0; for (File ff: listLogs) { if (ff.getName().startsWith("snapshot")) { numSnaps++; } } Assert.assertTrue("exactly 3 snapshots ", (numSnaps == 3)); snaplog.close(); zks.shutdown(); }
/** * test the snapshot * @throws Exception an exception could be expected */ @Test public void testSnapshot() throws Exception { File snapDir = new File(testData, "invalidsnap"); ZooKeeperServer zks = new ZooKeeperServer(snapDir, snapDir, 3000); SyncRequestProcessor.setSnapCount(1000); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); LOG.info("starting up the zookeeper server .. waiting"); Assert.assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); ZooKeeper zk = ClientBase.createZKClient(HOSTPORT); try { // we know this from the data files // this node is the last node in the snapshot Assert.assertTrue(zk.exists("/9/9/8", false) != null); } finally { zk.close(); } f.shutdown(); zks.shutdown(); Assert.assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT)); }
@Test public void testDisconnectedAddAuth() throws Exception { File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(1000); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); try { LOG.info("starting up the zookeeper server .. waiting"); Assert.assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); ZooKeeper zk = ClientBase.createZKClient(HOSTPORT); try { zk.addAuthInfo("digest", "pat:test".getBytes()); zk.setACL("/", Ids.CREATOR_ALL_ACL, -1); } finally { zk.close(); } } finally { f.shutdown(); zks.shutdown(); Assert.assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT)); } }