/** * The asynchronous version of create. * * @see #create(String, byte[], List, CreateMode) */ public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); EphemeralType.validateTTL(createMode, -1); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
/** * In the following test, we verify that committed requests are processed * even when queuedRequests never gets empty. We add 10 committed request * and use infinite queuedRequests. We verify that the committed request was * processed. */ @Test(timeout = 1000) public void noStarvationOfNonLocalCommittedRequestsTest() throws Exception { final String path = "/noStarvationOfCommittedRequests"; processor.queuedRequests = new MockRequestsQueue(); Set<Request> nonLocalCommits = new HashSet<Request>(); for (int i = 0; i < 10; i++) { Request nonLocalCommitReq = newRequest( new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, 51, i + 1); processor.committedRequests.add(nonLocalCommitReq); nonLocalCommits.add(nonLocalCommitReq); } for (int i = 0; i < 10; i++) { processor.initThreads(defaultSizeOfThreadPool); processor.stoppedMainLoop = true; processor.run(); } Assert.assertTrue("commit request was not processed", processedRequests.containsAll(nonLocalCommits)); }
/** * The Asynchronous version of create. The request doesn't actually until * the asynchronous callback is called. * * @see #create(String, byte[], List, CreateMode) */ public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
/** * The asynchronous version of create. * * @see #create(String, byte[], List, CreateMode) */ public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
/** * The asynchronous version of create. * * @see #create(String, byte[], List, CreateMode) */ public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
/** * The asynchronous version of create. * * @see #create(String, byte[], List, CreateMode, Stat) */ public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, Create2Callback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create2); CreateRequest request = new CreateRequest(); Create2Response response = new Create2Response(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
public void sendWriteRequest() throws Exception { ByteArrayOutputStream boas = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); CreateRequest createReq = new CreateRequest( "/session" + Long.toHexString(sessionId) + "-" + (++nodeId), new byte[0], Ids.OPEN_ACL_UNSAFE, 1); createReq.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); Request req = new Request(null, sessionId, ++cxid, OpCode.create, bb, new ArrayList<Id>()); zks.getFirstProcessor().processRequest(req); }
private Request makeCreateRequest(String path, long sessionId) throws IOException { ByteArrayOutputStream boas = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<Id>()); }
/** * When we create ephemeral node, we need to check against global * session, so the leader never accept request from an expired session * (that we no longer track) * * This is not the same as SessionInvalidationTest since session * is not in closing state */ public void testCreateEphemeral(boolean localSessionEnabled) throws Exception { if (localSessionEnabled) { qu.enableLocalSession(true); } qu.startAll(); QuorumPeer leader = qu.getLeaderQuorumPeer(); ZooKeeper zk = ClientBase.createZKClient(qu.getConnectString(leader)); CreateRequest createRequest = new CreateRequest("/impossible", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); // Mimic sessionId generated by follower's local session tracker long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer() .getServerId(); long fakeSessionId = (sid << 56) + 1; LOG.info("Fake session Id: " + Long.toHexString(fakeSessionId)); Request request = new Request(null, fakeSessionId, 0, OpCode.create, bb, new ArrayList<Id>()); // Submit request directly to leader leader.getActiveServer().submitRequest(request); // Make sure that previous request is finished zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = zk.exists("/impossible", null); Assert.assertEquals("Node from fake session get created", null, stat); }
/** * When local session is enabled, leader will allow persistent node * to be create for unknown session */ @Test public void testCreatePersistent() throws Exception { qu.enableLocalSession(true); qu.startAll(); QuorumPeer leader = qu.getLeaderQuorumPeer(); ZooKeeper zk = ClientBase.createZKClient(qu.getConnectString(leader)); CreateRequest createRequest = new CreateRequest("/success", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); // Mimic sessionId generated by follower's local session tracker long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer() .getServerId(); long locallSession = (sid << 56) + 1; LOG.info("Local session Id: " + Long.toHexString(locallSession)); Request request = new Request(null, locallSession, 0, OpCode.create, bb, new ArrayList<Id>()); // Submit request directly to leader leader.getActiveServer().submitRequest(request); // Make sure that previous request is finished zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = zk.exists("/success", null); Assert.assertTrue("Request from local sesson failed", stat != null); }
private EventType checkType(Record response) { if (response == null) { return EventType.other; } else if (response instanceof ConnectRequest) { return EventType.write; } else if (response instanceof CreateRequest) { return EventType.write; } else if (response instanceof DeleteRequest) { return EventType.write; } else if (response instanceof SetDataRequest) { return EventType.write; } else if (response instanceof SetACLRequest) { return EventType.write; } else if (response instanceof SetMaxChildrenRequest) { return EventType.write; } else if (response instanceof SetSASLRequest) { return EventType.write; } else if (response instanceof SetWatches) { return EventType.write; } else if (response instanceof SyncRequest) { return EventType.write; } else if (response instanceof ExistsRequest) { return EventType.read; } else if (response instanceof GetDataRequest) { return EventType.read; } else if (response instanceof GetMaxChildrenRequest) { return EventType.read; } else if (response instanceof GetACLRequest) { return EventType.read; } else if (response instanceof GetChildrenRequest) { return EventType.read; } else if (response instanceof GetChildren2Request) { return EventType.read; } else if (response instanceof GetSASLRequest) { return EventType.read; } else { return EventType.other; } }
public Request checkUpgradeSession(Request request) throws IOException, KeeperException { // If this is a request for a local session and it is to // create an ephemeral node, then upgrade the session and return // a new session request for the leader. // This is called by the request processor thread (either follower // or observer request processor), which is unique to a learner. // So will not be called concurrently by two threads. if (request.type != OpCode.create || !upgradeableSessionTracker.isLocalSession(request.sessionId)) { return null; } CreateRequest createRequest = new CreateRequest(); request.request.rewind(); ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); request.request.rewind(); CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (!createMode.isEphemeral()) { return null; } // Uh oh. We need to upgrade before we can proceed. if (!self.isLocalSessionsUpgradingEnabled()) { throw new KeeperException.EphemeralOnLocalSessionException(); } return makeUpgradeRequest(request.sessionId); }
public void sendWriteRequest() throws Exception { ByteArrayOutputStream boas = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); CreateRequest createReq = new CreateRequest( "/session" + Long.toHexString(sessionId) + "-" + (++nodeId), new byte[0], Ids.OPEN_ACL_UNSAFE, 1); createReq.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); Request req = new Request(null, sessionId, ++cxid, OpCode.create, bb, new ArrayList<Id>()); zks.firstProcessor.processRequest(req); }
/** * When local session is enabled, leader will allow persistent node * to be create for unknown session */ @Test public void testCreatePersistent() throws Exception { QuorumUtil qu = new QuorumUtil(1); qu.enableLocalSession(true); qu.startAll(); QuorumPeer leader = qu.getLeaderQuorumPeer(); ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader), CONNECTION_TIMEOUT, this); CreateRequest createRequest = new CreateRequest("/success", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); // Mimic sessionId generated by follower's local session tracker long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer() .getServerId(); long locallSession = (sid << 56) + 1; LOG.info("Local session Id: " + Long.toHexString(locallSession)); Request request = new Request(null, locallSession, 0, OpCode.create, bb, new ArrayList<Id>()); // Submit request directly to leader leader.getActiveServer().submitRequest(request); // Make sure that previous request is finished zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = zk.exists("/success", null); Assert.assertTrue("Request from local sesson failed", stat != null); }
@Override public Record toRequestRecord() { return new CreateRequest(getPath(), data, acl, flags); }
/** * Test solution for ZOOKEEPER-1208. Verify that operations are not * accepted after a close session. * * We're using our own marshalling here in order to force an operation * after the session is closed (ZooKeeper.class will not allow this). Also * by filling the pipe with operations it increases the likelyhood that * the server will process the create before FinalRequestProcessor * removes the session from the tracker. */ @Test public void testCreateAfterCloseShouldFail() throws Exception { for (int i = 0; i < 10; i++) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); // open a connection boa.writeInt(44, "len"); ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16]); conReq.serialize(boa, "connect"); // close connection boa.writeInt(8, "len"); RequestHeader h = new RequestHeader(1, ZooDefs.OpCode.closeSession); h.serialize(boa, "header"); // create ephemeral znode boa.writeInt(52, "len"); // We'll fill this in later RequestHeader header = new RequestHeader(2, OpCode.create); header.serialize(boa, "header"); CreateRequest createReq = new CreateRequest("/foo" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 1); createReq.serialize(boa, "request"); baos.close(); System.out.println("Length:" + baos.toByteArray().length); String hp[] = hostPort.split(":"); Socket sock = new Socket(hp[0], Integer.parseInt(hp[1])); InputStream resultStream = null; try { OutputStream outstream = sock.getOutputStream(); byte[] data = baos.toByteArray(); outstream.write(data); outstream.flush(); resultStream = sock.getInputStream(); byte[] b = new byte[10000]; int len; while ((len = resultStream.read(b)) >= 0) { // got results System.out.println("gotlen:" + len); } } finally { if (resultStream != null) { resultStream.close(); } sock.close(); } } ZooKeeper zk = createClient(); Assert.assertEquals(1, zk.getChildren("/", false).size()); zk.close(); }
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { if (deserialize) { ByteBufferInputStream.byteBuffer2Record(request.request, record); } int flags; String path; List<ACL> acl; byte[] data; long ttl; if (type == OpCode.createTTL) { CreateTTLRequest createTtlRequest = (CreateTTLRequest)record; flags = createTtlRequest.getFlags(); path = createTtlRequest.getPath(); acl = createTtlRequest.getAcl(); data = createTtlRequest.getData(); ttl = createTtlRequest.getTtl(); } else { CreateRequest createRequest = (CreateRequest)record; flags = createRequest.getFlags(); path = createRequest.getPath(); acl = createRequest.getAcl(); data = createRequest.getData(); ttl = -1; } CreateMode createMode = CreateMode.fromFlag(flags); validateCreateRequest(path, createMode, request, ttl); String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, acl); ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL); int parentCVersion = parentRecord.stat.getCversion(); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL; if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); }
/** * In the following test, we add a write request followed by several read * requests of the same session, and we verify several things - 1. The write * is not processed until commit arrives. 2. Once the write is processed, * all the read requests are processed as well. 3. All read requests are * executed after the write, before any other write, along with new reads. */ @Test public void processAllFollowingUncommittedAfterFirstCommitTest() throws Exception { final String path = "/testUncommittedFollowingCommited"; Set<Request> shouldBeInPending = new HashSet<Request>(); Set<Request> shouldBeProcessedAfterPending = new HashSet<Request>(); Request writeReq = newRequest( new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, 0x1, 1); processor.queuedRequests.add(writeReq); shouldBeInPending.add(writeReq); for (int readReqId = 2; readReqId <= 5; ++readReqId) { Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, readReqId); processor.queuedRequests.add(readReq); shouldBeInPending.add(readReq); shouldBeProcessedAfterPending.add(readReq); } processor.initThreads(defaultSizeOfThreadPool); processor.stoppedMainLoop = true; processor.run(); Assert.assertTrue("Processed without waiting for commit", processedRequests.isEmpty()); Assert.assertTrue("Did not handled all of queuedRequests' requests", processor.queuedRequests.isEmpty()); shouldBeInPending .removeAll(processor.pendingRequests.get(writeReq.sessionId)); for (Request r : shouldBeInPending) { LOG.error("Should be in pending " + r); } Assert.assertTrue( "Not all requests moved to pending from queuedRequests", shouldBeInPending.isEmpty()); processor.committedRequests.add(writeReq); processor.stoppedMainLoop = true; processor.run(); processor.initThreads(defaultSizeOfThreadPool); Thread.sleep(500); Assert.assertTrue("Did not process committed request", processedRequests.peek() == writeReq); Assert.assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending)); Assert.assertTrue("Did not process committed request", processor.committedRequests.isEmpty()); Assert.assertTrue("Did not process committed request", processor.pendingRequests.isEmpty()); }
/** * In the following test, we verify that we can handle the case that we got a commit * of a request we never seen since the session that we just established. This can happen * when a session is just established and there is request waiting to be committed in the * session queue but it sees a commit for a request that belongs to the previous connection. */ @Test(timeout = 5000) public void noCrashOnCommittedRequestsOfUnseenRequestTest() throws Exception { final String path = "/noCrash/OnCommittedRequests/OfUnseenRequestTest"; final int numberofReads = 10; final int sessionid = 0x123456; final int firstCXid = 0x100; int readReqId = firstCXid; processor.stoppedMainLoop = true; HashSet<Request> localRequests = new HashSet<Request>(); // queue the blocking write request to queuedRequests Request firstCommittedReq = newRequest( new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, sessionid, readReqId++); processor.queuedRequests.add(firstCommittedReq); localRequests.add(firstCommittedReq); // queue read requests to queuedRequests for (; readReqId <= numberofReads+firstCXid; ++readReqId) { Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionid, readReqId); processor.queuedRequests.add(readReq); localRequests.add(readReq); } //run once Assert.assertTrue(processor.queuedRequests.containsAll(localRequests)); processor.initThreads(defaultSizeOfThreadPool); processor.run(); Thread.sleep(1000); //We verify that the processor is waiting for the commit Assert.assertTrue(processedRequests.isEmpty()); // We add a commit that belongs to the same session but with smaller cxid, // i.e., commit of an update from previous connection of this session. Request preSessionCommittedReq = newRequest( new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, sessionid, firstCXid - 2); processor.committedRequests.add(preSessionCommittedReq); processor.committedRequests.add(firstCommittedReq); processor.run(); Thread.sleep(1000); //We verify that the commit processor processed the old commit prior to the newer messages Assert.assertTrue(processedRequests.peek() == preSessionCommittedReq); processor.run(); Thread.sleep(1000); //We verify that the commit processor handle all messages. Assert.assertTrue(processedRequests.containsAll(localRequests)); }
/** * In the following test, we verify if we handle the case in which we get a commit * for a request that has higher Cxid than the one we are waiting. This can happen * when a session connection is lost but there is a request waiting to be committed in the * session queue. However, since the session has moved, new requests can get to * the leader out of order. Hence, the commits can also arrive "out of order" w.r.t. cxid. * We should commit the requests according to the order we receive from the leader, i.e., wait for the relevant commit. */ @Test(timeout = 5000) public void noCrashOnOutofOrderCommittedRequestTest() throws Exception { final String path = "/noCrash/OnCommittedRequests/OfUnSeenRequestTest"; final int sessionid = 0x123456; final int lastCXid = 0x100; final int numberofReads = 10; int readReqId = lastCXid; processor.stoppedMainLoop = true; HashSet<Request> localRequests = new HashSet<Request>(); // queue the blocking write request to queuedRequests Request orphanCommittedReq = newRequest( new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, sessionid, lastCXid); processor.queuedRequests.add(orphanCommittedReq); localRequests.add(orphanCommittedReq); // queue read requests to queuedRequests for (; readReqId <= numberofReads+lastCXid; ++readReqId) { Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionid, readReqId); processor.queuedRequests.add(readReq); localRequests.add(readReq); } //run once processor.initThreads(defaultSizeOfThreadPool); processor.run(); Thread.sleep(1000); //We verify that the processor is waiting for the commit Assert.assertTrue(processedRequests.isEmpty()); // We add a commit that belongs to the same session but with larger cxid, // i.e., commit of an update from the next connection of this session. Request otherSessionCommittedReq = newRequest( new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, sessionid, lastCXid+10); processor.committedRequests.add(otherSessionCommittedReq); processor.committedRequests.add(orphanCommittedReq); processor.run(); Thread.sleep(1000); //We verify that the commit processor processed the old commit prior to the newer messages Assert.assertTrue(processedRequests.size() == 1); Assert.assertTrue(processedRequests.contains(otherSessionCommittedReq)); processor.run(); Thread.sleep(1000); //We verify that the commit processor handle all messages. Assert.assertTrue(processedRequests.containsAll(localRequests)); }
/** * When we create ephemeral node, we need to check against global * session, so the leader never accept request from an expired session * (that we no longer track) * * This is not the same as SessionInvalidationTest since session * is not in closing state */ public void testCreateEphemeral(boolean localSessionEnabled) throws Exception { QuorumUtil qu = new QuorumUtil(1); if (localSessionEnabled) { qu.enableLocalSession(true); } qu.startAll(); QuorumPeer leader = qu.getLeaderQuorumPeer(); ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader), CONNECTION_TIMEOUT, this); CreateRequest createRequest = new CreateRequest("/impossible", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); // Mimic sessionId generated by follower's local session tracker long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer() .getServerId(); long fakeSessionId = (sid << 56) + 1; LOG.info("Fake session Id: " + Long.toHexString(fakeSessionId)); Request request = new Request(null, fakeSessionId, 0, OpCode.create, bb, new ArrayList<Id>()); // Submit request directly to leader leader.getActiveServer().submitRequest(request); // Make sure that previous request is finished zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = zk.exists("/impossible", null); Assert.assertEquals("Node from fake session get created", null, stat); }