@Test public void testClientReconnect() throws IOException, InterruptedException { HostProvider hostProvider = mock(HostProvider.class); when(hostProvider.size()).thenReturn(1); InetSocketAddress inaddr = new InetSocketAddress(1111); when(hostProvider.next(anyLong())).thenReturn(inaddr); ZooKeeper zk = mock(ZooKeeper.class); sc = SocketChannel.open(); ClientCnxnSocketNIO nioCnxn = new MockCnxn(); ClientWatchManager watcher = mock(ClientWatchManager.class); ClientCnxn clientCnxn = new ClientCnxn( "tmp", hostProvider, 5000, zk, watcher, nioCnxn, false); clientCnxn.start(); countDownLatch.await(5000, TimeUnit.MILLISECONDS); Assert.assertTrue(countDownLatch.getCount() == 0); clientCnxn.close(); }
@Test public void testClientReconnect() throws IOException, InterruptedException { HostProvider hostProvider = mock(HostProvider.class); when(hostProvider.size()).thenReturn(1); InetSocketAddress inaddr = new InetSocketAddress("127.0.0.1", 1111); when(hostProvider.next(anyLong())).thenReturn(inaddr); ZooKeeper zk = mock(ZooKeeper.class); when(zk.getClientConfig()).thenReturn(new ZKClientConfig()); sc = SocketChannel.open(); ClientCnxnSocketNIO nioCnxn = new MockCnxn(); ClientWatchManager watcher = mock(ClientWatchManager.class); ClientCnxn clientCnxn = new ClientCnxn( "tmp", hostProvider, 5000, zk, watcher, nioCnxn, false); clientCnxn.start(); countDownLatch.await(5000, TimeUnit.MILLISECONDS); Assert.assertTrue(countDownLatch.getCount() == 0); clientCnxn.close(); }
@Test public void testZooKeeperWithCustomHostProvider() throws IOException, InterruptedException { final int CLIENT_PORT = PortAssignment.unique(); final HostProvider specialHostProvider = new SpecialHostProvider(); int expectedCounter = 3; counter.set(expectedCounter); ZooKeeper zkDefaults = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this, false); ZooKeeper zkSpecial = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this, false, specialHostProvider); Assert.assertTrue(counter.get() == expectedCounter); zkDefaults.updateServerList("127.0.0.1:" + PortAssignment.unique()); Assert.assertTrue(counter.get() == expectedCounter); zkSpecial.updateServerList("127.0.0.1:" + PortAssignment.unique()); expectedCounter--; Assert.assertTrue(counter.get() == expectedCounter); }
/** * Creates a connection object. The actual network connect doesn't get * established until needed. The start() instance method must be called * subsequent to construction. * * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 * @param hostProvider the list of ZooKeeper servers to connect to * @param sessionTimeout the timeout for connections. * @param zooKeeper the zookeeper object that this connection is related to. * @param watcher watcher for this connection * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty) * @param sessionId session id if re-establishing session * @param sessionPasswd session passwd if re-establishing session * @param canBeReadOnly whether the connection is allowed to go to read-only * mode in case of partitioning * @throws IOException */ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); }
@Test public void testClientReconnect() throws IOException, InterruptedException { HostProvider hostProvider = mock(HostProvider.class); when(hostProvider.size()).thenReturn(1); InetSocketAddress inaddr = new InetSocketAddress("127.0.0.1", 1111); when(hostProvider.next(anyLong())).thenReturn(inaddr); ZooKeeper zk = mock(ZooKeeper.class); sc = SocketChannel.open(); ClientCnxnSocketNIO nioCnxn = new MockCnxn(); ClientWatchManager watcher = mock(ClientWatchManager.class); ClientCnxn clientCnxn = new ClientCnxn( "tmp", hostProvider, 5000, zk, watcher, nioCnxn, false); clientCnxn.start(); countDownLatch.await(5000, TimeUnit.MILLISECONDS); Assert.assertTrue(countDownLatch.getCount() == 0); clientCnxn.close(); }
@Test public void testNextGoesRound() throws UnknownHostException { HostProvider hostProvider = getHostProvider((byte) 2); InetSocketAddress first = hostProvider.next(0); assertTrue(first instanceof InetSocketAddress); hostProvider.next(0); assertEquals(first, hostProvider.next(0)); }
@Test public void testNextGoesRoundAndSleeps() throws UnknownHostException { byte size = 2; HostProvider hostProvider = getHostProvider(size); while (size > 0) { hostProvider.next(0); --size; } long start = System.currentTimeMillis(); hostProvider.next(1000); long stop = System.currentTimeMillis(); assertTrue(900 <= stop - start); }
@Test public void testNextDoesNotSleepForZero() throws UnknownHostException { byte size = 2; HostProvider hostProvider = getHostProvider(size); while (size > 0) { hostProvider.next(0); --size; } long start = System.currentTimeMillis(); hostProvider.next(0); long stop = System.currentTimeMillis(); assertTrue(5 > stop - start); }
@Test public void testOnConnectDoesNotReset() throws UnknownHostException { HostProvider hostProvider = getHostProvider((byte) 2); InetSocketAddress first = hostProvider.next(0); hostProvider.onConnected(); InetSocketAddress second = hostProvider.next(0); assertNotSame(first, second); }
@Test public void testLiteralIPNoReverseNS() throws Exception { byte size = 30; HostProvider hostProvider = getHostProviderUnresolved(size); for (int i = 0; i < size; i++) { InetSocketAddress next = hostProvider.next(0); assertTrue(next instanceof InetSocketAddress); assertTrue(!next.isUnresolved()); assertTrue("Did not match "+ next.toString(), !next.toString().startsWith("/")); // Do NOT trigger the reverse name service lookup. String hostname = next.getHostName(); // In this case, the hostname equals literal IP address. hostname.equals(next.getAddress().getHostAddress()); } }
public void configMutliCluster(ZooKeeper zk) { if (_servers.size() == 1) { return; } String cluster1 = _servers.get(0); try { if (_servers.size() > 1) { // 强制的声明accessible ReflectionUtils.makeAccessible(clientCnxnField); ReflectionUtils.makeAccessible(hostProviderField); ReflectionUtils.makeAccessible(serverAddressesField); // 添加第二组集群列表 for (int i = 1; i < _servers.size(); i++) { String cluster = _servers.get(i); // 强制获取zk中的地址信息 ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk); HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn); List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField, hostProvider); // 添加第二组集群列表 serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses()); } } } catch (Exception e) { try { if (zk != null) { zk.close(); } } catch (InterruptedException ie) { // ignore interrupt } throw new ZkException("zookeeper_create_error, serveraddrs=" + cluster1, e); } }
@Test public void testNextGoesRound() { HostProvider hostProvider = getHostProvider((byte) 2); InetSocketAddress first = hostProvider.next(0); assertTrue(first != null); hostProvider.next(0); assertEquals(first, hostProvider.next(0)); }
@Test public void testNextGoesRoundAndSleeps() { byte size = 2; HostProvider hostProvider = getHostProvider(size); while (size > 0) { hostProvider.next(0); --size; } long start = Time.currentElapsedTime(); hostProvider.next(1000); long stop = Time.currentElapsedTime(); assertTrue(900 <= stop - start); }
@Test public void testNextDoesNotSleepForZero() { byte size = 2; HostProvider hostProvider = getHostProvider(size); while (size > 0) { hostProvider.next(0); --size; } long start = Time.currentElapsedTime(); hostProvider.next(0); long stop = Time.currentElapsedTime(); assertTrue(5 > stop - start); }
@Test public void testOnConnectDoesNotReset() { HostProvider hostProvider = getHostProvider((byte) 2); InetSocketAddress first = hostProvider.next(0); hostProvider.onConnected(); InetSocketAddress second = hostProvider.next(0); assertNotSame(first, second); }
@Test public void testLiteralIPNoReverseNS() throws Exception { byte size = 30; HostProvider hostProvider = getHostProviderUnresolved(size); for (int i = 0; i < size; i++) { InetSocketAddress next = hostProvider.next(0); assertTrue(next instanceof InetSocketAddress); assertTrue(!next.isUnresolved()); assertTrue(!next.toString().startsWith("/")); // Do NOT trigger the reverse name service lookup. String hostname = next.getHostString(); // In this case, the hostname equals literal IP address. hostname.equals(next.getAddress().getHostAddress()); } }
@Test public void testNextGoesRound() throws UnknownHostException { HostProvider hostProvider = getHostProvider(2); InetSocketAddress first = hostProvider.next(0); assertTrue(first instanceof InetSocketAddress); hostProvider.next(0); assertEquals(first, hostProvider.next(0)); }
@Test public void testNextGoesRoundAndSleeps() throws UnknownHostException { int size = 2; HostProvider hostProvider = getHostProvider(size); while (size > 0) { hostProvider.next(0); --size; } long start = System.currentTimeMillis(); hostProvider.next(1000); long stop = System.currentTimeMillis(); assertTrue(900 <= stop - start); }
@Test public void testNextDoesNotSleepForZero() throws UnknownHostException { int size = 2; HostProvider hostProvider = getHostProvider(size); while (size > 0) { hostProvider.next(0); --size; } long start = System.currentTimeMillis(); hostProvider.next(0); long stop = System.currentTimeMillis(); assertTrue(5 > stop - start); }