private void internalShuffleMillis(LinkedList<InetSocketAddress> inetSocketAddressesList) throws Exception { int hashCode = new StaticHostProvider(inetSocketAddressesList).hashCode(); System.out.println(hashCode); int count = 10; Random r; while (count > 0) { long currentTime = System.currentTimeMillis(); r = new Random(currentTime ^ hashCode); System.out.print(String.format("currentTime: %s, currentTime ^ hashCode: %s, Result: ", currentTime, currentTime ^ hashCode)); Collections.shuffle(inetSocketAddressesList, r); for (InetSocketAddress inetSocketAddress : inetSocketAddressesList) { System.out.print(String.format("%s ", inetSocketAddress.getPort())); } System.out.println(); count--; } }
private void internalShuffleNano(LinkedList<InetSocketAddress> inetSocketAddressesList) throws Exception { int hashCode = new StaticHostProvider(inetSocketAddressesList).hashCode(); System.out.println(hashCode); int count = 10; Random r; while (count > 0) { long currentTime = System.nanoTime(); r = new Random(currentTime ^ hashCode); System.out.print(String.format("currentTime: %s, currentTime ^ hashCode: %s, Result: ", currentTime, currentTime ^ hashCode)); Collections.shuffle(inetSocketAddressesList, r); for (InetSocketAddress inetSocketAddress : inetSocketAddressesList) { System.out.print(String.format("%s ", inetSocketAddress.getPort())); } System.out.println(); count--; } }
private StaticHostProvider getHostProvider(byte size) throws UnknownHostException { ArrayList<InetSocketAddress> list = new ArrayList<InetSocketAddress>( size); while (size > 0) { try { list.add(new InetSocketAddress(InetAddress.getByAddress(new byte[]{10, 10, 10, size}), 1234 + size)); } catch (UnknownHostException e) { LOG.error("Exception while resolving address", e); fail("Failed to resolve address"); } --size; } return new StaticHostProvider(list); }
@Test(expected = IllegalArgumentException.class) public void testTwoInvalidHostAddresses() { ArrayList<InetSocketAddress> list = new ArrayList<InetSocketAddress>(); list.add(new InetSocketAddress("a...", 2181)); list.add(new InetSocketAddress("b...", 2181)); new StaticHostProvider(list); }
@Test public void testOneInvalidHostAddresses() { Collection<InetSocketAddress> addr = getServerAddresses((byte) 1); addr.add(new InetSocketAddress("a...", 2181)); StaticHostProvider sp = new StaticHostProvider(addr); InetSocketAddress n1 = sp.next(0); InetSocketAddress n2 = sp.next(0); assertEquals(n2, n1); }
private StaticHostProvider getHostProvider(int size) throws UnknownHostException { ArrayList<InetSocketAddress> list = new ArrayList<InetSocketAddress>( size); while (size > 0) { list.add(new InetSocketAddress("10.10.10." + size, 1234)); --size; } return new StaticHostProvider(list); }
@Test(expected = IllegalArgumentException.class) public void testTwoInvalidHostAddresses() { ArrayList<InetSocketAddress> list = new ArrayList<InetSocketAddress>(); list.add(new InetSocketAddress("a", 2181)); list.add(new InetSocketAddress("b", 2181)); new StaticHostProvider(list); }
@Test public void testOneInvalidHostAddresses() { Collection<InetSocketAddress> addr = getServerAddresses((byte) 1); addr.add(new InetSocketAddress("a", 2181)); StaticHostProvider sp = new StaticHostProvider(addr); InetSocketAddress n1 = sp.next(0); InetSocketAddress n2 = sp.next(0); assertEquals(n2, n1); }
/** * To create a ZooKeeper client object, the application needs to pass a * connection string containing a comma separated list of host:port pairs, * each corresponding to a ZooKeeper server. * <p> * Session establishment is asynchronous. This constructor will initiate * connection to the server and return immediately - potentially (usually) * before the session is fully established. The watcher argument specifies * the watcher that will be notified of any changes in state. This * notification can come at any point before or after the constructor call * has returned. * <p> * The instantiated ZooKeeper client object will pick an arbitrary server * from the connectString and attempt to connect to it. If establishment of * the connection fails, another server in the connect string will be tried * (the order is non-deterministic, as we random shuffle the list), until a * connection is established. The client will continue attempts until the * session is explicitly closed. * <p> * Added in 3.2.0: An optional "chroot" suffix may also be appended to the * connection string. This will run the client commands while interpreting * all paths relative to this root (similar to the unix chroot command). * * @param connectString * comma separated host:port pairs, each corresponding to a zk * server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If * the optional chroot suffix is used the example would look * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" * where the client would be rooted at "/app/a" and all paths * would be relative to this root - ie getting/setting/etc... * "/foo/bar" would result in operations being run on * "/app/a/foo/bar" (from the server perspective). * @param sessionTimeout * session timeout in milliseconds * @param watcher * a watcher object which will be notified of state changes, may * also be notified for node events * @param canBeReadOnly * (added in 3.4) whether the created client is allowed to go to * read-only mode in case of partitioning. Read-only mode * basically means that if the client can't find any majority * servers but there's partitioned server it could reach, it * connects to one in read-only mode, i.e. read requests are * allowed while write requests are not. It continues seeking for * majority in the background. * * @throws IOException * in cases of network failure * @throws IllegalArgumentException * if an invalid chroot path is specified */ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
private StaticHostProvider getHostProviderUnresolved(byte size) throws UnknownHostException { return new StaticHostProvider(getUnresolvedServerAddresses(size)); }
private static HostProvider createDefaultHostProvider(String connectString) { return new StaticHostProvider( new ConnectStringParser(connectString).getServerAddresses()); }
@Test public void testNoCurrentHostDuringNormalMode() throws UnknownHostException { // Start with 9 servers and 10000 clients boolean disconnectRequired; StaticHostProvider[] hostProviderArray = new StaticHostProvider[numClients]; InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients]; int[] numClientsPerHost = new int[9]; // initialization for (int i = 0; i < numClients; i++) { hostProviderArray[i] = getHostProvider((byte) 9); if (i >= (numClients / 2)) { curHostForEachClient[i] = hostProviderArray[i].next(0); } else { // its supposed to be the first server on serverList. // we'll set it later, see below (*) curHostForEachClient[i] = null; } } // remove hosts 7 and 8 (the last two in a list of 9 hosts) Collection<InetSocketAddress> newList = getServerAddresses((byte) 7); for (int i = 0; i < numClients; i++) { // tests the case currentHost == null && lastIndex == -1 // calls next for clients with index < numClients/2 disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]); if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0); else if (curHostForEachClient[i] == null) { // (*) setting it to what it should be curHostForEachClient[i] = hostProviderArray[i] .getServerAtIndex(0); } numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++; // sets lastIndex, resets reconfigMode hostProviderArray[i].onConnected(); } for (int i = 0; i < 7; i++) { assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 7)); assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 7)); numClientsPerHost[i] = 0; // prepare for next test } assertTrue(numClientsPerHost[7] == 0); assertTrue(numClientsPerHost[8] == 0); // add back server 7 newList = getServerAddresses((byte) 8); for (int i = 0; i < numClients; i++) { InetSocketAddress myServer = (i < (numClients / 2)) ? null : curHostForEachClient[i]; // tests the case currentHost == null && lastIndex >= 0 disconnectRequired = hostProviderArray[i].updateServerList(newList, myServer); if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0); numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++; hostProviderArray[i].onConnected(); } for (int i = 0; i < 8; i++) { assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8)); assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8)); } }
@Test public void testReconfigDuringReconfigMode() throws UnknownHostException { // Start with 9 servers and 10000 clients boolean disconnectRequired; StaticHostProvider[] hostProviderArray = new StaticHostProvider[numClients]; InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients]; int[] numClientsPerHost = new int[9]; // initialization for (int i = 0; i < numClients; i++) { hostProviderArray[i] = getHostProvider((byte) 9); curHostForEachClient[i] = hostProviderArray[i].next(0); } // remove hosts 7 and 8 (the last two in a list of 9 hosts) Collection<InetSocketAddress> newList = getServerAddresses((byte) 7); for (int i = 0; i < numClients; i++) { // sets reconfigMode hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]); } // add back servers 7 and 8 while still in reconfigMode (we didn't call // next) newList = getServerAddresses((byte) 9); for (int i = 0; i < numClients; i++) { InetSocketAddress myServer = (i < (numClients / 2)) ? null : curHostForEachClient[i]; // for i < (numClients/2) this tests the case currentHost == null && // reconfigMode = true // for i >= (numClients/2) this tests the case currentHost!=null && // reconfigMode = true disconnectRequired = hostProviderArray[i].updateServerList(newList, myServer); if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0); else { // currentIndex was set by the call to updateServerList, which // called next curHostForEachClient[i] = hostProviderArray[i] .getServerAtCurrentIndex(); } numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++; hostProviderArray[i].onConnected(); } for (int i = 0; i < 9; i++) { assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9)); assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9)); } }
private StaticHostProvider getHostProvider(byte size) { return new StaticHostProvider(getServerAddresses(size), r.nextLong()); }
private StaticHostProvider getHostProviderUnresolved(byte size) throws UnknownHostException { return new StaticHostProvider(getUnresolvedServerAddresses(size), r.nextLong()); }