private void connect(String seeds) { if (getWithSSL()) { LOGGER.info("SSL mode enabled"); try { SSLOptions sslOptions = new SSLOptions(SSLContext.getDefault(), CIPHERS); builder = Cluster.builder().withSSL(sslOptions); } catch (NoSuchAlgorithmException e) { LOGGER.error("Unable to setup SSL Options for Cassandra"); } } String[] contactPoints = seeds.split(","); for (String contactPoint : contactPoints) { LOGGER.info("Adding Cassandra contact point " + contactPoint); builder.addContactPoints(contactPoint); } cluster = builder.build(); Metadata metadata = cluster.getMetadata(); for (Host host : metadata.getAllHosts()) { LOGGER.info("Datacenter "+ host.getDatacenter() + "Host " + host.getAddress() + "Rack " + host.getRack()); session = cluster.connect(); } }
@Test public void testVnodeSupport() throws Exception { // Validate that peers as appropriately discovered when connecting to a node and vnodes are // assigned. try (BoundCluster boundCluster = server.register(ClusterSpec.builder().withNumberOfTokens(256).withNodes(3, 3, 3)); Cluster driverCluster = defaultBuilder(boundCluster).build()) { driverCluster.init(); // Should be 9 hosts assertThat(driverCluster.getMetadata().getAllHosts()).hasSize(9); Set<Token> allTokens = new HashSet<>(); for (Host host : driverCluster.getMetadata().getAllHosts()) { assertThat(host.getTokens()).hasSize(256); allTokens.addAll(host.getTokens()); } // Should be 256*9 unique tokens. assertThat(allTokens).hasSize(256 * 9); } }
@Override public ResultSet<CassandraDBContext> execute(Query<CassandraDBContext> query) throws QueryExecutionException { try (Cluster cassandraConnection = buildConnection()) { final Metadata metadata = cassandraConnection.getMetadata(); System.out.printf("Connected to cluster: %s", metadata.getClusterName()); for (final Host host : metadata.getAllHosts()) { System.out.printf("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()); } try (Session session = cassandraConnection.connect()) { String queryToExecute = query.getQuery(); System.out.println(queryToExecute); com.datastax.driver.core.ResultSet resultSet = session.execute(queryToExecute); printResultSet(resultSet); ExecutionInfo executionInfo = resultSet.getExecutionInfo(); System.out.println(executionInfo); } } // There isn't any resultset for these use-case return new CassandraResultSet(); }
static Session connect() { String contactPoint = "localhost"; String keySpace = "ks1"; if(session == null) { PoolingOptions poolingOptions = new PoolingOptions().setConnectionsPerHost(HostDistance.REMOTE, 1, 4); cluster = Cluster.builder().addContactPoint(contactPoint).withPoolingOptions(poolingOptions) .withCompression(Compression.SNAPPY).build(); cluster.init(); for (Host host : cluster.getMetadata().getAllHosts()) { System.out.printf("Address: %s, Rack: %s, Datacenter: %s, Tokens: %s\n", host.getAddress(), host.getDatacenter(), host.getRack(), host.getTokens()); } session = cluster.connect(keySpace); } return session; }
static Session connect() { String contactPoint = "localhost"; String keySpace = "ks1"; if(session == null) { RetryPolicy retryPolicy = new CustomRetryPolicy(3, 3, 2); cluster = Cluster.builder().addContactPoint(contactPoint) .withRetryPolicy(retryPolicy).build(); cluster.init(); for (Host host : cluster.getMetadata().getAllHosts()) { System.out.printf("Address: %s, Rack: %s, Datacenter: %s, Tokens: %s\n", host.getAddress(), host.getDatacenter(), host.getRack(), host.getTokens()); } } return session; }
static Session connect() { String contactPoint = "localhost"; String keySpace = "ks1"; if(session == null) { DCAwareRoundRobinPolicy dcAwarePolicy = new DCAwareRoundRobinPolicy.Builder().build(); LoadBalancingPolicy policy = new TokenAwarePolicy(dcAwarePolicy); cluster = Cluster.builder().addContactPoint(contactPoint) .withLoadBalancingPolicy(policy).build(); cluster.init(); for (Host host : cluster.getMetadata().getAllHosts()) { System.out.printf("Address: %s, Rack: %s, Datacenter: %s, Tokens: %s\n", host.getAddress(), host.getDatacenter(), host.getRack(), host.getTokens()); } } return session; }
@Override public void update(Host host, Statement statement, Exception e, long nanos) { if (!(statement instanceof NamedBoundStatement)) return; Span span = cache.remove(statement); if (span == null) { if (statement.isTracing()) { LOG.warn("{} not in the cache eventhough tracing is on", statement); } return; } span.setDuration(nanos / 1000); // TODO: allow client tracer to end with duration Endpoint local = span.getAnnotations().get(0).host; // TODO: expose in brave long endTs = span.getTimestamp() + span.getDuration(); if (e != null) { span.addToBinary_annotations(BinaryAnnotation.create("cql.error", e.getMessage(), local)); } else { span.addToAnnotations(Annotation.create(endTs, "cr", local)); } int ipv4 = ByteBuffer.wrap(host.getAddress().getAddress()).getInt(); Endpoint endpoint = Endpoint.create("cassandra", ipv4, host.getSocketAddress().getPort()); span.addToBinary_annotations(BinaryAnnotation.address("sa", endpoint)); collector.collect(span); }
@Override public void update(Host host, Statement statement, Exception e, long nanos) { if (!(statement instanceof BoundStatement)) return; Span span = cache.remove(statement); if (span == null) { if (statement.isTracing()) { LOG.warn("{} not in the cache eventhough tracing is on", statement); } return; } span.setDuration(nanos / 1000); // TODO: allow client tracer to end with duration Endpoint local = span.getAnnotations().get(0).host; // TODO: expose in brave long endTs = span.getTimestamp() + span.getDuration(); span.addToAnnotations(Annotation.create(endTs, "cr", local)); if (e != null) { span.addToBinary_annotations(BinaryAnnotation.create(Constants.ERROR, e.getMessage(), local)); } int ipv4 = ByteBuffer.wrap(host.getAddress().getAddress()).getInt(); Endpoint endpoint = Endpoint.create("cassandra3", ipv4, host.getSocketAddress().getPort()); span.addToBinary_annotations(BinaryAnnotation.address("sa", endpoint)); collector.collect(span); }
@Test public void testClusterHintsPollerWhenNodeDown() throws UnknownHostException { ClusterHintsPoller clusterHintsPoller = new ClusterHintsPoller(); Session mockSession = mock(Session.class); Cluster mockCluster = mock(Cluster.class); Metadata mockMetadata = mock(Metadata.class); when(mockCluster.getMetadata()).thenReturn(mockMetadata); when(mockCluster.getClusterName()).thenReturn("test-cluster"); Host node1 = mock(Host.class); when(node1.getAddress()).thenReturn(InetAddress.getByName("127.0.0.1")); Host node2 = mock(Host.class); when(node2.getAddress()).thenReturn(InetAddress.getByName("127.0.0.2")); Host node3 = mock(Host.class); when(node3.getAddress()).thenReturn(InetAddress.getByName("127.0.0.3")); when(mockSession.getCluster()).thenReturn(mockCluster); // The first node queried is down when(mockSession.execute(any(Statement.class))).thenThrow(new NoHostAvailableException(ImmutableMap.<InetSocketAddress, Throwable>of())); when(mockMetadata.getAllHosts()).thenReturn(ImmutableSet.of(node1, node2, node3)); HintsPollerResult actualResult = clusterHintsPoller.getOldestHintsInfo(mockSession); // Make sure HintsPollerResult fails assertFalse(actualResult.areAllHostsPolling(), "Result should show hosts failing"); assertEquals(actualResult.getHostFailure(), ImmutableSet.of(InetAddress.getByName("127.0.0.1")), "Node 1 should return with host failure"); }
private ArgumentMatcher<Statement> getHostStatementMatcher(final Host host, final String query) throws Exception { return new ArgumentMatcher<Statement>() { @Override public boolean matches(Object argument) { SelectedHostStatement statement = (SelectedHostStatement) argument; return ((SimpleStatement)statement.getStatement()).getQueryString().equals(query) && Objects.equals(statement.getHostCordinator().getAddress(), host.getAddress()); } @Override public void describeTo(Description description) { description.appendText(format("query:%s host:%s", query, host.getAddress().toString())); } }; }
private Result pingAll() { try { StringBuilder message = new StringBuilder(); OperationResult<CqlStatementResult> astyanaxResult = pingAstyanax(); message.append("Astyanax: ").append(astyanaxResult.getHost()).append(" ") .append(astyanaxResult.getLatency(TimeUnit.MICROSECONDS)).append("us"); if (astyanaxResult.getAttemptsCount() != 1) { message.append(", ").append(astyanaxResult.getAttemptsCount()).append(" attempts"); } Stopwatch cqlTimer = Stopwatch.createStarted(); ResultSet cqlResult = pingCql(); long queryDurationMicros = cqlTimer.elapsed(TimeUnit.MICROSECONDS); Host host = cqlResult.getExecutionInfo().getQueriedHost(); message.append(" | CQL: ").append(host).append(" ").append(queryDurationMicros).append("us"); return Result.healthy(message.toString()); } catch (Throwable t) { return Result.unhealthy(t); } }
public void sendMutation(Mutation mutation) { for (PartitionUpdate partition : mutation.getPartitionUpdates()) { Set<Host> replicas = cluster.getMetadata().getReplicas(mutation.getKeyspaceName(), partition.partitionKey().getKey()); // in case theres multiple partitions in this mutation, with topology changes we cant assume can send // them in batches so break them up. Mutation toSend = new Mutation(mutation.getKeyspaceName(), partition.partitionKey()); toSend.add(partition); for(Host h : replicas) { InetAddress target = h.getBroadcastAddress(); StorageConnection conn = connections.get(target); if(conn == null) { conn = connections.computeIfAbsent(target, host -> { StorageConnection c = new StorageConnection(host); c.connect(); return c; }); } try { conn.enqueue(toSend.createMessage(), idGen.incrementAndGet()); } catch (IOException e) { e.printStackTrace(); } } } }
/** * Constructor * * @param nodes a list of one or more Cassandra nodes to connect to. Note * that not all Cassandra nodes in the cluster need be * supplied; one will suffice however if that node is * unavailable the connection attempt will fail, even if the * others are available. */ public StoreConnection(List<String> nodes) { Cluster.Builder builder = Cluster.builder(); for (String node : nodes) { builder.addContactPoint(node); } cluster = builder.build(); Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s%n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { System.out.printf("Datacenter: %s; Host: %s; Rack: %s%n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); }
@Override public Iterator<Host> newQueryPlan(String keyspace, Statement statement) { List<Host> local = new ArrayList<>(1); List<Host> remote = new ArrayList<>(liveReplicaHosts.size()); for (Host liveReplicaHost : liveReplicaHosts) { if (isLocalHost(liveReplicaHost)) { local.add(liveReplicaHost); } else { remote.add(liveReplicaHost); } } Collections.shuffle(remote); logger.debug("Using the following hosts order for the new query plan: {} | {}", local, remote); return Iterators.concat(local.iterator(), remote.iterator()); }
public Set<String> resolveAllPermitted(StressSettings settings) { Set<String> r = new HashSet<>(); switch (settings.mode.api) { case THRIFT_SMART: case JAVA_DRIVER_NATIVE: if (!isWhiteList) { for (Host host : settings.getJavaDriverClient().getCluster().getMetadata().getAllHosts()) r.add(host.getAddress().getHostName()); break; } case THRIFT: case SIMPLE_NATIVE: for (InetAddress address : resolveAllSpecified()) r.add(address.getHostName()); } return r; }
@Test public void testToHostAddressList() throws Exception { Set<Host> hosts = ImmutableSet.<Host>of( new TestHost( new InetSocketAddress( InetAddress.getByAddress(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 }), 3000)), new TestHost(new InetSocketAddress(InetAddress.getByAddress(new byte[] {1, 2, 3, 4}), 3000))); HostAddressFactory hostAddressFactory = new HostAddressFactory(); List<HostAddress> list = hostAddressFactory.toHostAddressList(hosts); assertEquals(list.toString(), "[[102:304:506:708:90a:b0c:d0e:f10], 1.2.3.4]"); }
void check() throws ClusterUnhealthyException { log.debug("Checking cluster health"); Set<Host> allHosts = cluster.getMetadata().getAllHosts(); List<InetAddress> unhealthyHosts = allHosts .stream() .filter(host -> !host.isUp()) .map(Host::getAddress) .collect(Collectors.toList()); if (!unhealthyHosts.isEmpty()) { throw new ClusterUnhealthyException("Cluster not healthy, the following hosts are down: " + unhealthyHosts); } log.debug("All hosts healthy: {}", allHosts); }
@Override public Iterator<Host> newQueryPlan(String keyspace, Statement statement) { List<Host> local = new ArrayList<>(1); List<Host> remote = new ArrayList<>(liveReplicaHosts.size()); for (Host liveReplicaHost : liveReplicaHosts) { if (isLocalHost(liveReplicaHost)) { local.add(liveReplicaHost); } else { remote.add(liveReplicaHost); } } Collections.shuffle(remote); logger.trace("Using the following hosts order for the new query plan: {} | {}", local, remote); return Iterators.concat(local.iterator(), remote.iterator()); }
@Override public Map<InetAddress, Collection<Range<Token>>> getEndpointRanges() { HashMap<InetAddress, Collection<Range<Token>>> map = new HashMap<>(); for (TokenRange range : metadata.getTokenRanges()) { Range<Token> tr = new Range<Token>(getToken(range.getStart()), getToken(range.getEnd())); for (Host host : metadata.getReplicas(getKeyspace(), range)) { Collection<Range<Token>> c = map.get(host.getAddress()); if (c == null) { c = new ArrayList<>(); map.put(host.getAddress(), c); } c.add(tr); } } return map; }
private void logReplicaBatchMap(String name, Map<Set<Host>, Deque<BatchStatement>> map) { if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(name); sb.append(": Size: ").append(map.size()); sb.append(". Replicas: |"); for (Entry<Set<Host>, Deque<BatchStatement>> entry : map.entrySet()) { for (Host host : entry.getKey()) { sb.append(host.getAddress().toString()).append(","); } sb.append(":"); for (BatchStatement bs : entry.getValue()) { sb.append(bs.size()).append(","); } sb.append("|"); } logger.debug(sb.toString()); } }
private boolean availableInFlightSlots(Statement st) { boolean available = false; Iterator<Host> hostIterator = loadBalancingPolicy.newQueryPlan(session.getLoggedKeyspace(), st); hostIter: while(hostIterator.hasNext()) { Host host = hostIterator.next(); int inFlightQueries = session.getState().getInFlightQueries(host); switch(loadBalancingPolicy.distance(host)) { case LOCAL: if(inFlightQueries < maxInFlightLocal) { available = true; break hostIter; } break; case REMOTE: if(inFlightQueries < maxInFlightRemote) { available = true; break hostIter; } break; default: // IGNORED is something we're not going to write to break; } } return available; }
private Client get(ByteBuffer pk) { Set<Host> hosts = metadata.getReplicas(keyspace, pk); int count = roundrobin.incrementAndGet() % hosts.size(); if (count < 0) count = -count; Iterator<Host> iter = hosts.iterator(); while (count > 0 && iter.hasNext()) iter.next(); Host host = iter.next(); ConcurrentLinkedQueue<Client> q = cache.get(host); if (q == null) { ConcurrentLinkedQueue<Client> newQ = new ConcurrentLinkedQueue<Client>(); q = cache.putIfAbsent(host, newQ); if (q == null) q = newQ; } Client tclient = q.poll(); if (tclient != null) return tclient; return new Client(settings.getRawThriftClient(host.getAddress().getHostAddress()), host); }
private Client get(ByteBuffer pk) { Set<Host> hosts = metadata.getReplicas(metadata.quote(keyspace), pk); int pos = roundrobin.incrementAndGet() % hosts.size(); if (pos < 0) pos = -pos; Host host = Iterators.get(hosts.iterator(), pos); ConcurrentLinkedQueue<Client> q = cache.get(host); if (q == null) { ConcurrentLinkedQueue<Client> newQ = new ConcurrentLinkedQueue<Client>(); q = cache.putIfAbsent(host, newQ); if (q == null) q = newQ; } Client tclient = q.poll(); if (tclient != null) return tclient; return new Client(settings.getRawThriftClient(host.getAddress().getHostAddress()), host); }
public void logExecutionInfo(String prefix, ExecutionInfo executionInfo) { if (executionInfo != null) { StringBuilder msg = new StringBuilder("\n" + prefix); msg.append(String.format("\nHost (queried): %s\n", executionInfo.getQueriedHost().toString())); for (Host host : executionInfo.getTriedHosts()) { msg.append(String.format("Host (tried): %s\n", host.toString())); } QueryTrace queryTrace = executionInfo.getQueryTrace(); if (queryTrace != null) { msg.append(String.format("Trace id: %s\n\n", queryTrace.getTraceId())); msg.append(String.format("%-80s | %-12s | %-20s | %-12s\n", "activity", "timestamp", "source", "source_elapsed")); msg.append(String.format("---------------------------------------------------------------------------------+--------------+----------------------+--------------\n")); for (QueryTrace.Event event : queryTrace.getEvents()) { msg.append(String.format("%80s | %12s | %20s | %12s\n", event.getDescription(), format.format(event.getTimestamp()), event.getSource(), event.getSourceElapsedMicros())); } LOG.info(msg.toString()); } else { LOG.warn("Query Trace is null\n" + msg); } } else { LOG.warn("Null execution info"); } }
public void logCluster(Cluster cluster) { try { if (cluster != null && !cluster.isClosed()) { String clusterName = cluster.getClusterName(); Metadata metadata = cluster.getMetadata(); Set<Host> allHosts = metadata.getAllHosts(); StringBuilder b = new StringBuilder("\nCassandra Cluster '" + clusterName + "' details (via native client driver) are :"); for (Host host : allHosts) { b.append(ClusterProbe.prettyHost(host)); } LOG.info(b.toString()); } else { LOG.warn("Null or closed cluster"); } } catch (Throwable t) { } }
private void write(String query, Object... values) { logger.debug("query = {} : values = {}", query, values); PreparedStatement stmt = writeStatementCache.getUnchecked(query); BoundStatement bind = stmt.bind(values); ResultSet rs = session.execute(bind); ExecutionInfo executionInfo = rs.getExecutionInfo(); Host queriedHost = executionInfo.getQueriedHost(); logger.debug("queried host = {}", queriedHost); if (tracingEnabled) { QueryTrace queryTrace = executionInfo.getQueryTrace(); if (queryTrace != null) { if (logger.isDebugEnabled()) { logger.debug("{}", toString(queryTrace)); } } } }
@Test(groups = {"system"}) public void testMetadata() throws Exception { Metadata metadata = cluster.getMetadata(); assertTrue(metadata.getClusterName().length() > 0); if (LOG.isDebugEnabled()) { LOG.debug(String.format("Connected to cluster: %s\n", metadata.getClusterName())); } assertTrue(metadata.getAllHosts().size() > 0); for (Host host : metadata.getAllHosts()) { assertTrue(host.getDatacenter().length() > 0); assertNotNull(host.getAddress()); assertTrue(host.getRack().length() > 0); if (LOG.isDebugEnabled()) { LOG.debug(String.format("Datacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack())); } } }
@Before public void setUp() throws Exception { Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); repository.createKeyspace(); repository.createAoisTable(); repository.createAclsTable(); repository.createChilrenTable(); SecurityContextHolder.getContext().setAuthentication( new UsernamePasswordAuthenticationToken(sid1, "password", Arrays.asList(new SimpleGrantedAuthority[] { new SimpleGrantedAuthority( ROLE_ADMIN) }))); }
@Before public void setUp() throws Exception { Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); service.createKeyspace(); service.createAoisTable(); service.createAclsTable(); service.createChilrenTable(); SecurityContextHolder.getContext().setAuthentication( new UsernamePasswordAuthenticationToken(sid1, "password", Arrays.asList(new SimpleGrantedAuthority[] { new SimpleGrantedAuthority( ROLE_ADMIN) }))); }
@Before public void setUp() throws Exception { Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); service.createKeyspace(); service.createAoisTable(); service.createAclsTable(); service.createChilrenTable(); SecurityContextHolder.getContext().setAuthentication( new UsernamePasswordAuthenticationToken(sid1, "password", Arrays.asList(new SimpleGrantedAuthority[] { new SimpleGrantedAuthority( "ROLE_USER") }))); }
@Test public void testClusterDiscovery() throws Exception { // Validate that peers as appropriately discovered when connecting to a node. try (BoundCluster boundCluster = server.register(ClusterSpec.builder().withNodes(3, 3, 3)); Cluster driverCluster = defaultBuilder(boundCluster).build()) { BoundDataCenter dc0 = boundCluster.getDataCenters().iterator().next(); driverCluster.init(); // Should be 9 hosts assertThat(driverCluster.getMetadata().getAllHosts()).hasSize(9); // Connect and ensure pools are created to local dc hosts. Session session = driverCluster.connect(); // Verify hosts connected to are only those in the local DC. Collection<SocketAddress> connectedHosts = session .getState() .getConnectedHosts() .stream() .map(Host::getSocketAddress) .collect(Collectors.toList()); Collection<SocketAddress> dcHosts = dc0.getNodes().stream().map(BoundNode::getAddress).collect(Collectors.toList()); assertThat(connectedHosts).hasSameElementsAs(dcHosts); } }
private static Cluster getCluster(){ if(cluster==null){ synchronized (SessionManager.class) { if(cluster==null){ PoolingOptions poolingOptions = new PoolingOptions(); poolingOptions .setMaxRequestsPerConnection(HostDistance.REMOTE, max) .setMaxRequestsPerConnection(HostDistance.LOCAL,max) .setMaxQueueSize(max*10) .setCoreConnectionsPerHost(HostDistance.LOCAL, 1) .setMaxConnectionsPerHost( HostDistance.LOCAL, 2) .setCoreConnectionsPerHost(HostDistance.REMOTE, 1) .setMaxConnectionsPerHost( HostDistance.REMOTE, 2); SocketOptions socketOptions = new SocketOptions(); socketOptions.setConnectTimeoutMillis(60000); socketOptions.setReadTimeoutMillis(60000); cluster = Cluster.builder().addContactPoint(url).withPoolingOptions(poolingOptions).withSocketOptions(socketOptions).build(); Metadata metadata = cluster.getMetadata(); Set<Host> allHosts = metadata.getAllHosts(); for(Host host:allHosts){ System.out.println("host:"+host.getAddress()); } } } } return cluster; }
/** * Log the information related to the hosts in this cluster. */ private void logClusterInfo() { LOG.info("*** Cassandra Cluster host information ***"); for (Host host : clusterMetadata.getAllHosts()) { LOG.info("Datacenter: {}; Host: {}; Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack()); } LOG.info("*** END Cassandra Cluster host information ***"); }
@Test public void loadBalancing_defaultsToRoundRobin() { RoundRobinPolicy policy = toRoundRobinPolicy(Cassandra3Storage.builder().build()); Host foo = mock(Host.class); when(foo.getDatacenter()).thenReturn("foo"); Host bar = mock(Host.class); when(bar.getDatacenter()).thenReturn("bar"); policy.init(mock(Cluster.class), asList(foo, bar)); assertThat(policy.distance(foo)).isEqualTo(HostDistance.LOCAL); assertThat(policy.distance(bar)).isEqualTo(HostDistance.LOCAL); }