/** * Derive the retry policy from the given string to the implementation. Defaults to {@link DefaultRetryPolicy}. * * @param retryPolicy * The name of the retry policy to use. Can be "downgrading" {@link DowngradingConsistencyRetryPolicy} or * "fallthrough" {@link FallthroughRetryPolicy}. Everything else defaults to * Policies.defaultRetryPolicy() */ public void setRetryPolicy(String retryPolicy) { switch(retryPolicy) { case "downgrading": setRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); break; case "fallthrough": setRetryPolicy(FallthroughRetryPolicy.INSTANCE); break; default: setRetryPolicy(Policies.defaultRetryPolicy()); break; } }
/** * Set the reconnection policy to define how often and in what interval to retry setup connections. * * @param policy * The reconnection policy as string "constant" {@link ConstantReconnectionPolicy} or "exponential" * {@link ExponentialReconnectionPolicy} * @param delay * The initial or constant delay * @param max * The maximum delay (only required for {@link ExponentialReconnectionPolicy}) */ public void setReconnectionPolicy(String policy, int delay, int... max) { switch(policy) { case "constant": setReconnectionPolicy(new ConstantReconnectionPolicy(delay)); break; case "exponential": setReconnectionPolicy(new ExponentialReconnectionPolicy(delay, max[0])); break; default: setReconnectionPolicy(Policies.defaultReconnectionPolicy()); break; } }
@Test public void shouldCreateClusterWithConfig() throws Exception { CassandraServiceInfo info = new CassandraServiceInfo("local", Collections.singletonList("127.0.0.1"), 9142); CassandraClusterConfig config = new CassandraClusterConfig(); config.setCompression(ProtocolOptions.Compression.NONE); config.setPoolingOptions(new PoolingOptions().setPoolTimeoutMillis(1234)); config.setQueryOptions(new QueryOptions()); config.setProtocolVersion(ProtocolVersion.NEWEST_SUPPORTED); config.setLoadBalancingPolicy(new RoundRobinPolicy()); config.setReconnectionPolicy(new ConstantReconnectionPolicy(1)); config.setRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); config.setSocketOptions(new SocketOptions()); Cluster cluster = creator.create(info, config); Configuration configuration = cluster.getConfiguration(); assertThat(configuration.getProtocolOptions().getCompression(), is(config.getCompression())); assertThat(configuration.getQueryOptions(), is(config.getQueryOptions())); assertThat(configuration.getSocketOptions(), is(config.getSocketOptions())); Policies policies = configuration.getPolicies(); assertThat(policies.getLoadBalancingPolicy(), is(config.getLoadBalancingPolicy())); assertThat(policies.getReconnectionPolicy(), is(config.getReconnectionPolicy())); assertThat(policies.getRetryPolicy(), is(config.getRetryPolicy())); }
private com.datastax.driver.core.Cluster.Builder newCqlDriverBuilder(ConnectionPoolConfiguration poolConfig, MetricRegistry metricRegistry) { performHostDiscovery(metricRegistry); String[] seeds = _seeds.split(","); List<String> contactPoints = Lists.newArrayListWithCapacity(seeds.length); // Each seed may be a host name or a host name and port (e.g.; "1.2.3.4" or "1.2.3.4:9160"). These need // to be converted into host names only. for (String seed : seeds) { HostAndPort hostAndPort = HostAndPort.fromString(seed); seed = hostAndPort.getHostText(); if (hostAndPort.hasPort()) { if (hostAndPort.getPort() == _thriftPort) { _log.debug("Seed {} found using RPC port; swapping for native port {}", seed, _cqlPort); } else if (hostAndPort.getPort() != _cqlPort) { throw new IllegalArgumentException(String.format( "Seed %s found with invalid port %s. The port must match either the RPC (thrift) port %s " + "or the native (CQL) port %s", seed, hostAndPort.getPort(), _thriftPort, _cqlPort)); } } contactPoints.add(seed); } PoolingOptions poolingOptions = new PoolingOptions(); if (poolConfig.getMaxConnectionsPerHost().or(getMaxConnectionsPerHost()).isPresent()) { poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, poolConfig.getMaxConnectionsPerHost().or(getMaxConnectionsPerHost()).get()); } if (poolConfig.getCoreConnectionsPerHost().or(getCoreConnectionsPerHost()).isPresent()) { poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, poolConfig.getCoreConnectionsPerHost().or(getCoreConnectionsPerHost()).get()); } SocketOptions socketOptions = new SocketOptions(); if (poolConfig.getConnectTimeout().or(getConnectTimeout()).isPresent()) { socketOptions.setConnectTimeoutMillis(poolConfig.getConnectTimeout().or(getConnectTimeout()).get()); } if (poolConfig.getSocketTimeout().or(getSocketTimeout()).isPresent()) { socketOptions.setReadTimeoutMillis(poolConfig.getSocketTimeout().or(getSocketTimeout()).get()); } AuthProvider authProvider = _authenticationCredentials != null ? new PlainTextAuthProvider(_authenticationCredentials.getUsername(), _authenticationCredentials.getPassword()) : AuthProvider.NONE; return com.datastax.driver.core.Cluster.builder() .addContactPoints(contactPoints.toArray(new String[contactPoints.size()])) .withPort(_cqlPort) .withPoolingOptions(poolingOptions) .withSocketOptions(socketOptions) .withRetryPolicy(Policies.defaultRetryPolicy()) .withAuthProvider(authProvider); }
@BeforeClass() public static void setup() throws ConfigurationException, IOException { Schema.instance.clear(); cassandra = new EmbeddedCassandraService(); cassandra.start(); cluster = Cluster.builder().addContactPoint("127.0.0.1") .withRetryPolicy(new LoggingRetryPolicy(Policies.defaultRetryPolicy())) .withPort(DatabaseDescriptor.getNativeTransportPort()).build(); session = cluster.connect(); session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +" WITH replication " + "= {'class':'SimpleStrategy', 'replication_factor':1};"); session.execute("USE " + KEYSPACE); session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (" + "key blob," + "value blob," + "PRIMARY KEY (key));"); // Prepared statements getStatement = session.prepare("SELECT value FROM " + TABLE + " WHERE key = ?;"); getStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); putStatement = session.prepare("INSERT INTO " + TABLE + " (key, value) VALUES (?, ?);"); putStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); StringBuilder s = new StringBuilder(); char a='a'; char z='z'; for (int i = 0; i < 500*1024; i++) { char x = (char)((i%((z-a)+1))+a); if (x == 'a') { x = '\n'; } s.append(x); } VALUE = s.toString(); }
public Policies getPolicies() { return policies; }
public void setPolicies(Policies policies) { this.policies = policies; }
/** * @return the policiesBuilder */ public Policies.Builder getPoliciesBuilder() { return policiesBuilder; }
public synchronized Cluster buildCluster(){ ConsistencyLevel defaultConsistencyLevel; try { defaultConsistencyLevel = cassandraConfig.getDataStaxReadCl(); } catch (IllegalArgumentException e){ logger.error("Unable to parse provided consistency level in property: {}, defaulting to: {}", CassandraFig.READ_CL, ConsistencyLevel.LOCAL_QUORUM); defaultConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM; } LoadBalancingPolicy loadBalancingPolicy; if( !cassandraConfig.getLocalDataCenter().isEmpty() ){ loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder() .withLocalDc( cassandraConfig.getLocalDataCenter() ).build(); }else{ loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder().build(); } final PoolingOptions poolingOptions = new PoolingOptions() .setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraConfig.getConnections()) .setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraConfig.getConnections()) .setIdleTimeoutSeconds( cassandraConfig.getPoolTimeout() / 1000 ) .setPoolTimeoutMillis( cassandraConfig.getPoolTimeout()); // purposely add a couple seconds to the driver's lower level socket timeouts vs. cassandra timeouts final SocketOptions socketOptions = new SocketOptions() .setConnectTimeoutMillis( cassandraConfig.getTimeout()) .setReadTimeoutMillis( cassandraConfig.getTimeout()) .setKeepAlive(true); final QueryOptions queryOptions = new QueryOptions() .setConsistencyLevel(defaultConsistencyLevel) .setMetadataEnabled(true); // choose whether to have the driver store metadata such as schema info Cluster.Builder datastaxCluster = Cluster.builder() .withClusterName(cassandraConfig.getClusterName()) .addContactPoints(cassandraConfig.getHosts().split(",")) .withMaxSchemaAgreementWaitSeconds(45) .withCompression(ProtocolOptions.Compression.LZ4) .withLoadBalancingPolicy(loadBalancingPolicy) .withPoolingOptions(poolingOptions) .withQueryOptions(queryOptions) .withSocketOptions(socketOptions) .withReconnectionPolicy(Policies.defaultReconnectionPolicy()) // client side timestamp generation is IMPORTANT; otherwise successive writes are left up to the server // to determine the ts and bad network delays, clock sync, etc. can result in bad behaviors .withTimestampGenerator(new AtomicMonotonicTimestampGenerator()) .withProtocolVersion(getProtocolVersion(cassandraConfig.getVersion())); // only add auth credentials if they were provided if ( !cassandraConfig.getUsername().isEmpty() && !cassandraConfig.getPassword().isEmpty() ){ datastaxCluster.withCredentials( cassandraConfig.getUsername(), cassandraConfig.getPassword() ); } return datastaxCluster.build(); }
/** * @param policiesBuilder * the policiesBuilder to set */ public void setPoliciesBuilder(Policies.Builder policiesBuilder) { this.policiesBuilder = policiesBuilder; }