@Bean public Session createSession(CassandraProperties properties, Cluster cluster) throws Exception { Session session = Retriable.wrap(cluster::connect) .withErrorMessage("Cannot connect to cassandra cluster") .retryOn(NoHostAvailableException.class) .withDelaySec(properties.getConnectDelaySec()) .call(); initDb(properties, session); if (!session.getCluster().getMetadata().checkSchemaAgreement()) { log.warn("SCHEMA IS NOT IN AGREEMENT!!!"); } return session; }
@SuppressWarnings("unused") private void connectToCassaCluster(String address) { PoolingOptions poolingOptions = new PoolingOptions() .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); Iterator<String> it = getAllPossibleLocalIps().iterator(); logger.debug("Iterating through possible ips:"+getAllPossibleLocalIps()); while (it.hasNext()) { try { cluster = Cluster.builder() .withPort(9042) .withPoolingOptions(poolingOptions) .withoutMetrics() .addContactPoint(address) .build(); //cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(Integer.MAX_VALUE); Metadata metadata = cluster.getMetadata(); logger.debug("Connected to cluster:"+metadata.getClusterName()+" at address:"+address); session = cluster.connect(); break; } catch (NoHostAvailableException e) { address = it.next(); } } }
private void connectToCassaCluster(){ Iterator<String> it = getAllPossibleLocalIps().iterator(); String address= "localhost"; logger.debug("Connecting to cassa cluster: Iterating through possible ips:"+getAllPossibleLocalIps()); while(it.hasNext()){ try { cluster = Cluster.builder().withPort(9042).addContactPoint(address).build(); //cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(Integer.MAX_VALUE); Metadata metadata = cluster.getMetadata(); logger.debug("Connected to cassa cluster "+metadata.getClusterName()+" at "+address); /* for ( Host host : metadata.getAllHosts() ) { .out.printf("Datacenter: %s; Host broadcast: %s; Rack: %s\n", host.getDatacenter(), host.getBroadcastAddress(), host.getRack()); }*/ session = cluster.connect(); break; } catch (NoHostAvailableException e) { address= it.next(); } } }
private void connectToCassaCluster(){ Iterator<String> it = getAllPossibleLocalIps().iterator(); String address= "localhost"; // logger.debug("Connecting to cassa cluster: Iterating through possible ips:"+getAllPossibleLocalIps()); while(it.hasNext()){ try { cluster = Cluster.builder().withPort(9042).addContactPoint(address).build(); //cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(Integer.MAX_VALUE); Metadata metadata = cluster.getMetadata(); // logger.debug("Connected to cassa cluster "+metadata.getClusterName()+" at "+address); /* for ( Host host : metadata.getAllHosts() ) { System.out.printf("Datacenter: %s; Host broadcast: %s; Rack: %s\n", host.getDatacenter(), host.getBroadcastAddress(), host.getRack()); }*/ session = cluster.connect(); break; } catch (NoHostAvailableException e) { address= it.next(); } } }
@Test public void testClusterHintsPollerWhenNodeDown() throws UnknownHostException { ClusterHintsPoller clusterHintsPoller = new ClusterHintsPoller(); Session mockSession = mock(Session.class); Cluster mockCluster = mock(Cluster.class); Metadata mockMetadata = mock(Metadata.class); when(mockCluster.getMetadata()).thenReturn(mockMetadata); when(mockCluster.getClusterName()).thenReturn("test-cluster"); Host node1 = mock(Host.class); when(node1.getAddress()).thenReturn(InetAddress.getByName("127.0.0.1")); Host node2 = mock(Host.class); when(node2.getAddress()).thenReturn(InetAddress.getByName("127.0.0.2")); Host node3 = mock(Host.class); when(node3.getAddress()).thenReturn(InetAddress.getByName("127.0.0.3")); when(mockSession.getCluster()).thenReturn(mockCluster); // The first node queried is down when(mockSession.execute(any(Statement.class))).thenThrow(new NoHostAvailableException(ImmutableMap.<InetSocketAddress, Throwable>of())); when(mockMetadata.getAllHosts()).thenReturn(ImmutableSet.of(node1, node2, node3)); HintsPollerResult actualResult = clusterHintsPoller.getOldestHintsInfo(mockSession); // Make sure HintsPollerResult fails assertFalse(actualResult.areAllHostsPolling(), "Result should show hosts failing"); assertEquals(actualResult.getHostFailure(), ImmutableSet.of(InetAddress.getByName("127.0.0.1")), "Node 1 should return with host failure"); }
@Test(expected = NoHostAvailableException.class) public void testHostNotFoundErrorHandling() throws Exception { CassandraSinkBase base = new CassandraSinkBase(new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { return builder .addContactPoint("127.0.0.1") .withoutJMXReporting() .withoutMetrics().build(); } }) { @Override public ListenableFuture send(Object value) { return null; } }; base.open(new Configuration()); }
public <T> T executeWithSession(String schemaName, SessionCallable<T> sessionCallable) { NoHostAvailableException lastException = null; for (int i = 0; i < 2; i++) { Session session = getSession(schemaName); try { return sessionCallable.executeWithSession(session); } catch (NoHostAvailableException e) { lastException = e; // Something happened with our client connection. We need to // re-establish the connection using our contact points. sessionBySchema.asMap().remove(schemaName, session); } } throw lastException; }
private boolean checkCassandraReachable(List<ConfigIssue> issues) { boolean isReachable = true; try (Cluster validationCluster = getCluster()) { Session validationSession = validationCluster.connect(); validationSession.close(); } catch (NoHostAvailableException | AuthenticationException | IllegalStateException | StageException e) { isReachable = false; Target.Context context = getContext(); LOG.error(Errors.CASSANDRA_05.getMessage(), e.toString(), e); issues.add( context.createConfigIssue( Groups.CASSANDRA.name(), CONTACT_NODES_LABEL, Errors.CASSANDRA_05, e.toString() ) ); } return isReachable; }
public void createCluster() { erroredOut = false; schemaCreated = false; cassandraCluster = CCMBridge.create("test", 1); try { Builder builder = Cluster.builder(); builder = configure(builder); cluster = builder.addContactPoints(IP_PREFIX + '1').build(); session = cluster.connect(); } catch (NoHostAvailableException e) { erroredOut = true; for (Map.Entry<InetSocketAddress, Throwable> entry : e.getErrors().entrySet()) logger.info("Error connecting to " + entry.getKey() + ": " + entry.getValue()); throw new RuntimeException(e); } }
@Override public void sendToNext(Document doc) { if (isRemembering()) { try { Session session = getCassandra().getSession(); PreparedStatement preparedQuery = getCassandra().getPreparedQuery(UPDATE_HASH_U); BoundStatement bind = preparedQuery.bind(doc.getHash(), doc.getId(), doc.getSourceScannerName()); session.execute(bind); } catch (NoHostAvailableException e) { if (!Main.isShuttingDown()) { log.error("Could not contact our internal Cassandra!!!" + e); } } } superSendToNext(doc); }
static Session createSession() throws Exception { Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1") // long read timeout is sometimes needed on slow travis ci machines .withSocketOptions(new SocketOptions().setReadTimeoutMillis(30000)) .withQueryOptions(getQueryOptions()) .build(); Session session = cluster.connect(); session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION =" + " { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); session.execute("CREATE TABLE IF NOT EXISTS test.users" + " (id int PRIMARY KEY, fname text, lname text)"); try { session.execute("TRUNCATE test.users"); } catch (NoHostAvailableException e) { // sometimes slow, so give it a second chance session.execute("TRUNCATE test.users"); } for (int i = 0; i < 10; i++) { session.execute("INSERT INTO test.users (id, fname, lname) VALUES (" + i + ", 'f" + i + "', 'l" + i + "')"); } return session; }
@Test public void testBadConnection() { CassandraConnection cc = new CassandraConnection(); cc.setProperty("contactPoints", "127.1.1.1"); // cc.setProperty("keyspace", "testks"); cc.setProperty("sessionName", "testsession"); Boolean exeptionCaught=false; try { cc.testStarted(); } catch (NoHostAvailableException e) { exeptionCaught = true; } assertTrue(exeptionCaught, "NoHostAvailable did not occur."); cc.testEnded(); }
public CassandraConfig(DataService dataService, String configId, Map<String, String> properties, boolean odataEnable) throws DataServiceFault { super(dataService, configId, DataSourceTypes.CASSANDRA, properties, odataEnable); Builder builder = Cluster.builder(); this.populateSettings(builder, properties); String keyspace = properties.get(DBConstants.Cassandra.KEYSPACE); this.cluster = builder.build(); try { if (keyspace != null && keyspace.trim().length() > 0) { this.session = this.cluster.connect(keyspace); } else { this.session = this.cluster.connect(); } this.nativeBatchRequestsSupported = this.session.getCluster(). getConfiguration().getProtocolOptions().getProtocolVersion().toInt() > 1; } catch (NoHostAvailableException e) { throw new DataServiceFault(e, DBConstants.FaultCodes.CONNECTION_UNAVAILABLE_ERROR, e.getMessage()); } }
@Override public synchronized Session getApplicationSession(){ // always grab cluster from getCluster() in case it was prematurely closed if ( applicationSession == null || applicationSession.isClosed() ){ int retries = 3; int retryCount = 0; while ( retryCount < retries){ try{ retryCount++; applicationSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationKeyspace() ) ); break; }catch(NoHostAvailableException e){ if(retryCount == retries){ throw e; } try { Thread.sleep(1000); } catch (InterruptedException ie) { // swallow } } } } return applicationSession; }
@Override public synchronized Session getApplicationLocalSession(){ // always grab cluster from getCluster() in case it was prematurely closed if ( queueMessageSession == null || queueMessageSession.isClosed() ){ int retries = 3; int retryCount = 0; while ( retryCount < retries){ try{ retryCount++; queueMessageSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace() ) ); break; }catch(NoHostAvailableException e){ if(retryCount == retries){ throw e; } try { Thread.sleep(1000); } catch (InterruptedException ie) { // swallow } } } } return queueMessageSession; }
@Override public boolean matches(Object item) { if (item instanceof NoHostAvailableException) { NoHostAvailableException nhae = (NoHostAvailableException) item; if (nhae.getErrors().size() == 1) { Throwable error = nhae.getErrors().values().iterator().next(); return expectedFirstErrorMatcher.matches(error); } } return false; }
@Test public void testRejectAndAcceptAfter() throws Exception { Collection<BoundDataCenter> datacenters = server.getCluster().getDataCenters(); BoundDataCenter dc = datacenters.iterator().next(); Iterator<BoundNode> nodeIterator = dc.getNodes().iterator(); BoundNode node = nodeIterator.next(); Scope scope = new Scope(server.getCluster().getId(), dc.getId(), node.getId()); HttpTestResponse delete = server.delete("/listener/" + scope + "?after=" + 3 + "&type=" + "unbind"); assertThat(delete.response.statusCode()).isEqualTo(200); // First try try (com.datastax.driver.core.Cluster driverCluster = defaultBuilder().addContactPointsWithPorts((InetSocketAddress) node.getAddress()).build()) { driverCluster.init(); } // Second try try (com.datastax.driver.core.Cluster driverCluster = defaultBuilder().addContactPointsWithPorts((InetSocketAddress) node.getAddress()).build()) { driverCluster.init(); } // Now it should be rejected try (com.datastax.driver.core.Cluster driverCluster = defaultBuilder().addContactPointsWithPorts((InetSocketAddress) node.getAddress()).build()) { driverCluster.init(); } catch (NoHostAvailableException e) { } HttpTestResponse accept = server.put("/listener/" + scope); assertThat(accept.response.statusCode()).isEqualTo(200); // Now it should go back to normal try (com.datastax.driver.core.Cluster driverCluster = defaultBuilder().addContactPointsWithPorts((InetSocketAddress) node.getAddress()).build()) { driverCluster.init(); } }
@Bean public Cluster createCluster(CassandraProperties properties) { return Retriable.wrap(() -> doCreateCluster(properties)) .withErrorMessage("Cannot connect to cassandra cluster") .retryOn(NoHostAvailableException.class, UnknownHostException.class) .withDelaySec(properties.getConnectDelaySec()) .call(); }
@Test public void reportsSpanOnTransportException() throws Exception { cluster.close(); try { invokeBoundStatement(); failBecauseExceptionWasNotThrown(NoHostAvailableException.class); } catch (NoHostAvailableException e) { } assertThat(spans).hasSize(1); }
@Test public void check_failsInsteadOfThrowing() { CheckResult result = Cassandra3Storage.builder().contactPoints("1.1.1.1").build().check(); assertThat(result.ok).isFalse(); assertThat(result.exception) .isInstanceOf(NoHostAvailableException.class); }
@Test public void check_failsInsteadOfThrowing() { CheckResult result = CassandraStorage.builder().contactPoints("1.1.1.1").build().check(); assertThat(result.ok).isFalse(); assertThat(result.exception) .isInstanceOf(NoHostAvailableException.class); }
/** * Returns true if the exception is one which indicates that the frame size may be too large, false otherwise. */ private static boolean isAdaptiveException(Throwable t) { if (t instanceof FrameTooLongException) { return true; } if (t instanceof NoHostAvailableException) { // If the issue on every host is adaptive then the exception is adaptive Collection<Throwable> hostExceptions = ((NoHostAvailableException) t).getErrors().values(); return !hostExceptions.isEmpty() && hostExceptions.stream().allMatch(AdaptiveResultSet::isAdaptiveException); } return false; }
private void fetchLoop(BoundStatement boundStatement) throws InterruptedException { boundStatement.setConsistencyLevel(scanConsistencyLevel); boundStatement.setFetchSize(Integer.parseInt(properties.get("fetch_size"))); String currentPageInfo = null; do { try { LOG.debug("Hitting..." + currentPageInfo + "..."); if (currentPageInfo != null) { boundStatement.setPagingState(PagingState.fromString(currentPageInfo)); } ResultSet rs = session.execute(boundStatement); LOG.debug("Pushed to queue"); queue.put(rs); PagingState nextPage = rs.getExecutionInfo().getPagingState(); String nextPageInfo = null; if (nextPage != null) { nextPageInfo = nextPage.toString(); } currentPageInfo = nextPageInfo; } catch (NoHostAvailableException e) { LOG.warn("No host available exception... going to sleep for 1 sec"); try { Thread.sleep(1000 * 1); } catch (Exception e2) { } } LOG.debug("Finished while loop"); } while (currentPageInfo != null); }
private void startServerIfNotRunning() throws IOException, TTransportException, InterruptedException { try { session = new Cluster.Builder() .addContactPoints("localhost") .withProtocolVersion(ProtocolVersion.V3) .build().connect(); } catch (NoHostAvailableException e) { String cassandraYmlLocation = findPathForCassandraYaml("./cassandra.yml"); if (null == cassandraYmlLocation || cassandraYmlLocation.isEmpty()) { cassandraYmlLocation = findPathForCassandraYaml("./secret-store-api/target/test-classes/cassandra.yml"); } if (null == cassandraYmlLocation || cassandraYmlLocation.isEmpty()) { throw new IllegalArgumentException("Could not find a cassandra.yml"); } System.setProperty("cassandra.config", "file://" + cassandraYmlLocation); EmbeddedCassandraService service = new EmbeddedCassandraService(); service.start(); session = new Cluster.Builder() .addContactPoints("localhost") .withPort(9142) .withProtocolVersion(ProtocolVersion.V3) .build().connect(); } }
public int healthCheck() { final Statement health = QueryBuilder.select().all().from(HEALTHCHECK_KEYSPACE_NAME, "healthcheck") .where(eq("healthkey", "healthy")); health.setConsistencyLevel(ConsistencyLevel.ALL); health.enableTracing(); QueryTrace queryTrace; cluster.register(new LoggingLatencyTracker()); try { final ResultSet results = session.execute(health); final ExecutionInfo executionInfo = results.getExecutionInfo(); queryTrace = executionInfo.getQueryTrace(); } catch (NoHostAvailableException e) { LOG.error("No hosts available", e); return 2; } if (retryPolicy.getLastDecision() != null) { LOG.warn("Could not query all hosts"); if (queryTrace != null) { final Set<InetAddress> missingHosts = new HashSet<>(hosts.size()); for (Host host : hosts) { missingHosts.add(host.getSocketAddress().getAddress()); } for (QueryTrace.Event event : queryTrace.getEvents()) { missingHosts.remove(event.getSource()); LOG.debug("description={} elapsed={} source={} micros={}", event.getDescription(), millis2Date(event.getTimestamp()), event.getSource(), event.getSourceElapsedMicros()); } if (!missingHosts.isEmpty()) { LOG.error("Missing log entries from these hosts: {}", missingHosts); } } return 1; } return 0; }
/** * Checks if Cassandra table absence error occur. * * @param e Exception to check. * @return {@code true} in case of table absence error. */ public static boolean isTableAbsenceError(Throwable e) { while (e != null) { if (e instanceof InvalidQueryException && (TABLE_EXIST_ERROR1.matcher(e.getMessage()).matches() || KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() || KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches())) return true; if (e instanceof NoHostAvailableException && ((NoHostAvailableException) e).getErrors() != null) { NoHostAvailableException ex = (NoHostAvailableException)e; for (Map.Entry<InetSocketAddress, Throwable> entry : ex.getErrors().entrySet()) { //noinspection ThrowableResultOfMethodCallIgnored Throwable error = entry.getValue(); if (error instanceof DriverException && (error.getMessage().contains(TABLE_EXIST_ERROR2) || KEYSPACE_EXIST_ERROR3.matcher(error.getMessage()).matches())) return true; } } e = e.getCause(); } return false; }
/** * Checks if Cassandra host availability error occur, thus host became unavailable. * * @param e Exception to check. * @return {@code true} in case of host not available error. */ public static boolean isHostsAvailabilityError(Throwable e) { while (e != null) { if (e instanceof NoHostAvailableException || e instanceof ReadTimeoutException) return true; e = e.getCause(); } return false; }
private void reportDocStatus(Status status, Document document, String message, Object... messageParams) { try { ThreadContext.put(JesterJAppender.JJ_INGEST_DOCID, document.getId()); ThreadContext.put(JesterJAppender.JJ_INGEST_SOURCE_SCANNER, document.getSourceScannerName()); document.setStatus(status); log.info(status.getMarker(), message, messageParams); } catch (AppenderLoggingException | NoHostAvailableException e) { if (!Main.isShuttingDown()) { log.error("Could not contact our internal Cassandra!!!" + e); } } finally { ThreadContext.clearAll(); } }
private void dropTableIfExists(String tableName) throws Exception { Stopwatch stopwatch = Stopwatch.createStarted(); while (stopwatch.elapsed(SECONDS) < 60) { try { session.execute("drop table if exists " + tableName); return; } catch (NoHostAvailableException e) { logger.debug(e.getMessage(), e); } Thread.sleep(1000); } // try one last time and let exception bubble up session.execute("drop table if exists " + tableName); }
private static void waitForCassandra() throws InterruptedException { while (true) { Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); try { cluster.connect(); cluster.close(); return; } catch (NoHostAvailableException e) { cluster.close(); Thread.sleep(1000); } } }
public static void createCluster() { erroredOut = false; schemaCreated = false; cassandraCluster = CCMBridge.create("test", 1); try { cluster = Cluster.builder().addContactPoints(IP_PREFIX + '1').build(); session = cluster.connect(); } catch (NoHostAvailableException e) { erroredOut = true; for (Map.Entry<InetSocketAddress, Throwable> entry : e.getErrors().entrySet()) logger.info("Error connecting to " + entry.getKey() + ": " + entry.getValue()); throw new RuntimeException(e); } }
private CCMCluster(CCMBridge cassandraCluster, Cluster.Builder builder, int totalNodes) { this.cassandraCluster = cassandraCluster; try { this.cluster = builder.addContactPoints(IP_PREFIX + '1').build(); this.session = cluster.connect(); } catch (NoHostAvailableException e) { for (Map.Entry<InetSocketAddress, Throwable> entry : e.getErrors().entrySet()) logger.info("Error connecting to " + entry.getKey() + ": " + entry.getValue()); throw new RuntimeException(e); } }
/** * * A session holds connections to a Cassandra cluster, allowing it to be * queried. Each session maintains multiple connections to the cluster * nodes, provides policies to choose which node to use for each query * (round-robin on all nodes of the cluster by default), and handles retries * for failed queries (when it makes sense), etc... * * Session instances are thread-safe and usually a single instance is enough * per application. As a given session can only be "logged" into one * keyspace at a time (where the "logged" keyspace is the one used by * queries that don't explicitly use a fully qualified table name), it can * make sense to create one session per keyspace used. This is however not * necessary when querying multiple keyspaces since it is always possible to * use a single session with fully qualified table names in queries. * * @return the Cassandra session */ public Session getSession() throws Exception { if (getCluster().isClosed()) { throw new Exception(this.getBeanName() + ":getSession: cluster bean has been closed"); } // wait to acquire the session lock (default wait time is 10 seconds). if (!sessionLock.tryLock(getSessionLockWaitTime(), TimeUnit.MILLISECONDS)) { throw new Exception( this.getBeanName() + ":getSession: timed out attempting to acquire Cassandra session"); } try { // session may have already existed if (session != null && !session.isClosed()) { return session; } else if (session != null) { throw new Exception(this.getBeanName() + ":getSession: Cassandra session has been closed"); } // session does not exist, so create one try { session = getCluster().connect(getKeyspace()); } catch (NoHostAvailableException exc) { LOG.error(getBeanName() + ":unable to connect Cassandra during bean initialization, msg = " + exc.getMessage()); throw exc; } } finally { sessionLock.unlock(); } return session; }
@Override public void storePerson(Person person) { try { BoundStatement bind = storeStatement.bind(person.getFirstName(), person.getLastName(), person.getAge(), person.getInterestingDates()); session.execute(bind); } catch (NoHostAvailableException e) { throw new UnableToSavePersonException(); } }
public CassandraStorage(String host, int port) throws Exception { LOG.debug("Connecting to Cassandra Storage @ [{}:{}]", host, port); try { cluster = Cluster.builder().addContactPoints(host).withPort(port) .build(); } catch (NoHostAvailableException e) { throw new RuntimeException(e); } }
public ResultSet executeSync(Statement cql) { if(logCql) { logger.debug("Executing QueryBuilder Query: {}", cql.toString()); } //just run a normal execute without a prepared statement try { return session.execute(cql); } catch(NoHostAvailableException e) { throw new RhombusTimeoutException(e); } catch(QueryExecutionException e2) { throw new RhombusTimeoutException(e2); } }
public void executeBatch(List<CQLStatementIterator> statementIterators) { BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); for(CQLStatementIterator statementIterator : statementIterators) { while(statementIterator.hasNext()) { CQLStatement statement = statementIterator.next(); batchStatement.add(getBoundStatement(session, statement)); } } try { session.execute(batchStatement); } catch(NoHostAvailableException e) { throw new RhombusTimeoutException(e); } catch(QueryExecutionException e2) { throw new RhombusTimeoutException(e2); } }
private static void connect() { Set<InetSocketAddress> dbHosts = ioconfig.getUniqueBinaryTransportHostsAsInetSocketAddresses(); int readTimeoutMaxRetries = ioconfig.getReadTimeoutMaxRetries(); int writeTimeoutMaxRetries = ioconfig.getWriteTimeoutMaxRetries(); int unavailableMaxRetries = ioconfig.getUnavailableMaxRetries(); CodecRegistry codecRegistry = new CodecRegistry(); cluster = Cluster.builder() .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(ioconfig.getDatacenterName()).build(), false)) .withPoolingOptions(getPoolingOptions()) .withRetryPolicy(new RetryNTimes(readTimeoutMaxRetries, writeTimeoutMaxRetries, unavailableMaxRetries)) .withCodecRegistry(codecRegistry) .withSocketOptions(getSocketOptions()) .addContactPointsWithPorts(dbHosts) .build(); QueryLogger queryLogger = QueryLogger.builder() .withConstantThreshold(5000) .build(); cluster.register(queryLogger); if ( LOG.isDebugEnabled() ) { logDebugConnectionInfo(); } try { session = cluster.connect( CassandraModel.QUOTED_KEYSPACE ); } catch (NoHostAvailableException e){ // TODO: figure out how to bubble this up throw new RuntimeException(e); } }