@Override public void startComponent() { if (cluster == null) { // Configure and build up the Cassandra cluster. cluster = Cluster.builder() .withClusterName(clusterName) .withPort(port) .withRetryPolicy(DefaultRetryPolicy.INSTANCE) // TokenAware requires query has routing info (e.g. BoundStatement with all PK value bound). .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())) .addContactPoints(contactPoints.toArray(new String[contactPoints.size()])) .build(); // Register any codecs. cluster.getConfiguration().getCodecRegistry() .register(new CassandraEnumCodec<>(AccessMode.class, AccessMode.getValueMap())) .register(new CassandraEnumCodec<>(Direction.class, Direction.getValueMap())) .register(new CassandraEnumCodec<>(SourceEntity.Type.class, SourceEntity.Type.getValueMap())); // Create a session. manager = new MappingManager(cluster.connect()); } }
@Override public RetryDecision onReadTimeout( Statement stmt, ConsistencyLevel cl, int required, int received, boolean retrieved, int retry) { if (retry > 1) { try { Thread.sleep(100); } catch (InterruptedException expected) { } } return null != stmt && stmt.isIdempotent() ? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow() : DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry); }
@JsonCreator public DatastaxMetricModule( @JsonProperty("id") Optional<String> id, @JsonProperty("groups") Optional<Groups> groups, @JsonProperty("seeds") Optional<Set<String>> seeds, @JsonProperty("schema") Optional<SchemaModule> schema, @JsonProperty("configure") Optional<Boolean> configure, @JsonProperty("fetchSize") Optional<Integer> fetchSize, @JsonProperty("readTimeout") Optional<Duration> readTimeout, @JsonProperty("consistencyLevel") Optional<ConsistencyLevel> consistencyLevel, @JsonProperty("retryPolicy") Optional<RetryPolicy> retryPolicy, @JsonProperty("authentication") Optional<DatastaxAuthentication> authentication ) { this.id = id; this.groups = groups.orElseGet(Groups::empty).or("heroic"); this.seeds = convert(seeds.orElse(DEFAULT_SEEDS)); this.schema = schema.orElseGet(NextGenSchemaModule.builder()::build); this.configure = configure.orElse(DEFAULT_CONFIGURE); this.fetchSize = fetchSize.orElse(DEFAULT_FETCH_SIZE); this.readTimeout = readTimeout.orElse(DEFAULT_READ_TIMEOUT); this.consistencyLevel = consistencyLevel.orElse(ConsistencyLevel.ONE); this.retryPolicy = retryPolicy.orElse(DefaultRetryPolicy.INSTANCE); this.authentication = authentication.orElseGet(DatastaxAuthentication.None::new); }
protected void createCache(Map<String, String> mapParams) throws Exception { final Cluster.Builder bluePrint = Cluster.builder().withClusterName("BluePrint") .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)) .withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy())) .addContactPoint(mapParams.get("cassandra.server.ip.address")).withPort(9042); cache1 = mache(String.class, CassandraTestEntity.class) .cachedBy(guava()) .storedIn(cassandra() .withCluster(bluePrint) .withKeyspace(mapParams.get("keyspace.name")) .withSchemaOptions(SchemaOptions.CREATE_SCHEMA_IF_NEEDED) .build()) .withMessaging(kafka() .withKafkaMqConfig(KafkaMqConfigBuilder.builder() .withZkHost(mapParams.get("kafka.connection")) .build()) .withTopic(mapParams.get("kafka.topic")) .build()) .macheUp(); }
@Test public void testRetryPolicyParsing() throws Exception { String retryPolicyStr = "DefaultRetryPolicy"; System.out.println(retryPolicyStr); assertTrue(Utils.parseRetryPolicy(retryPolicyStr) instanceof DefaultRetryPolicy); System.out.println("===================="); retryPolicyStr = "DowngradingConsistencyRetryPolicy"; System.out.println(retryPolicyStr); assertTrue(Utils.parseRetryPolicy(retryPolicyStr) instanceof DowngradingConsistencyRetryPolicy); System.out.println("===================="); retryPolicyStr = "FallthroughRetryPolicy"; System.out.println(retryPolicyStr); assertTrue(Utils.parseRetryPolicy(retryPolicyStr) instanceof FallthroughRetryPolicy); System.out.println("===================="); }
private Builder populateRetrytPolicy(Map<String, String> properties, Builder builder) throws DataServiceFault { String retryPolicy = properties.get(DBConstants.Cassandra.RETRY_POLICY); if (retryPolicy != null) { if ("DefaultRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); } else if ("DowngradingConsistencyRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); } else if ("FallthroughRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); } else if ("LoggingDefaultRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); } else if ("LoggingDowngradingConsistencyRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); } else if ("LoggingFallthroughRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); } else { throw new DataServiceFault("Invalid Cassandra retry policy: " + retryPolicy); } } return builder; }
@Test public void cassandraSessionWithConfiguration() throws Exception { ApplicationContext testContext = getTestApplicationContext( "cloud-cassandra-with-config.xml", createService("my-service")); Cluster cluster = testContext.getBean("cassandra-full-config", getConnectorType()); assertNotNull(cluster.getConfiguration().getSocketOptions()); assertEquals(15000, cluster.getConfiguration().getSocketOptions().getConnectTimeoutMillis()); assertTrue(DefaultRetryPolicy.class.isAssignableFrom( cluster.getConfiguration().getPolicies().getRetryPolicy().getClass())); assertTrue(RoundRobinPolicy.class.isAssignableFrom(cluster.getConfiguration() .getPolicies().getLoadBalancingPolicy().getClass())); assertTrue(ConstantReconnectionPolicy.class.isAssignableFrom(cluster .getConfiguration().getPolicies().getReconnectionPolicy().getClass())); }
/** * @param ip String * @param port String * @param poolingOptions PoolingOptions * @return Cluster Cluster */ private static Cluster createCluster(String ip, String port, PoolingOptions poolingOptions) { return Cluster.builder().addContactPoint(ip).withPort(Integer.parseInt(port)) .withProtocolVersion(ProtocolVersion.V3).withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withTimestampGenerator(new AtomicMonotonicTimestampGenerator()) .withPoolingOptions(poolingOptions).build(); }
public static void main(String[] args){ Cluster cluster; Session session; cluster = Cluster .builder() .addContactPoint("127.0.0.1") .withRetryPolicy(DefaultRetryPolicy.INSTANCE) //Other option: DowngradingConsistencyRetryPolicy .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())) .build(); session = cluster.connect("demo"); PreparedStatement statement = session.prepare("INSERT INTO user (id, name) VALUES (?, ?)"); Statement boundStatement = statement .bind(1, "user 1") .enableTracing(); long startTime = System.currentTimeMillis(); ResultSet resultSet = session.execute(boundStatement); long duration = System.currentTimeMillis() - startTime; System.out.format("Time taken: %d", duration); ExecutionInfo executionInfo = resultSet.getExecutionInfo(); printQueryTrace(executionInfo.getQueryTrace()); cluster.close(); }
public void init() { LOG.info("Starting Cassandra connector initialization."); Cluster.Builder builder = Cluster.builder() .addContactPoints(hosts) .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectionDelayMs)) .withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withCompression(ProtocolOptions.Compression.LZ4) .withSocketOptions(new SocketOptions() .setReceiveBufferSize(receiverBufferSize) .setSendBufferSize(senderBufferSize)) .withPort(port); if (poolingOptions != null) { int procs = Runtime.getRuntime().availableProcessors(); poolingOptions .setConnectionsPerHost(HostDistance.LOCAL, procs, procs * 2) .setConnectionsPerHost(HostDistance.REMOTE, (procs / 2), procs * 2) .setPoolTimeoutMillis(poolTimeoutMillis) .setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection) .setMaxRequestsPerConnection(HostDistance.REMOTE, maxRequestsPerConnection); builder.withPoolingOptions(poolingOptions); } if (!Strings.isNullOrEmpty(username)) { builder.withCredentials(username, password); } cluster = builder.build(); session = cluster.connect(keyspace); }
@Override public RetryDecision onWriteTimeout( Statement stmt, ConsistencyLevel cl, WriteType type, int required, int received, int retry) { return null != stmt && stmt.isIdempotent() ? RetryDecision.retry(cl) : DefaultRetryPolicy.INSTANCE.onWriteTimeout(stmt, cl, type, required, received, retry); }
@Test public void returnsDefaultRetryPolicyInstance() throws Exception { final DefaultRetryPolicyFactory factory = new DefaultRetryPolicyFactory(); final DefaultRetryPolicy policy = (DefaultRetryPolicy) factory.build(); assertThat(policy).isSameAs(DefaultRetryPolicy.INSTANCE); }
private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) { String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY); if (retryPolicy != null) { switch (retryPolicy) { case "DefaultRetryPolicy": builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); break; case "DowngradingConsistencyRetryPolicy": builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); break; case "FallthroughRetryPolicy": builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); break; case "LoggingDefaultRetryPolicy": builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); break; case "LoggingDowngradingConsistencyRetryPolicy": builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); break; case "LoggingFallthroughRetryPolicy": builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); break; default: LOG.error("Unsupported retry policy : {} ", retryPolicy); break; } } return builder; }
/** * Creates a {@link SessionManager} instance. Sub-class my override this * method to customized its own {@link SessionManager}. * * @return */ protected SessionManager createSessionManager() { SessionManager sm = new SessionManager(); // sm.setRetryPolicy(new // LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); sm.setRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); sm.setSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(10000, 3)); sm.init(); return sm; }
public CassandraSessionManaged build(Environment environment, String localDc) { PoolingOptions poolingOptions = new PoolingOptions(); poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, 3, 5) .setConnectionsPerHost(HostDistance.REMOTE, 1, 2); final DCAwareRoundRobinPolicy.Builder builder = DCAwareRoundRobinPolicy.builder(); if (localDc != null) { builder.withLocalDc(localDc); } QueryOptions queryOptions = new QueryOptions(); queryOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL); final Cluster cluster = Cluster .builder() .withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withReconnectionPolicy(new ExponentialReconnectionPolicy(10L, 1000L)) .withQueryOptions(queryOptions) .withLoadBalancingPolicy(new TokenAwarePolicy(builder.build())) .addContactPoints(getContactPoints().stream().toArray(String[]::new)) .withPort(getPort()) .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(1000, 2)) .withPoolingOptions(poolingOptions) .build(); cluster.getConfiguration().getCodecRegistry() .register(InstantCodec.instance); Session session = cluster.connect(getKeySpace()); CassandraSessionManaged cassandraSessionManaged = new CassandraSessionManaged(cluster, session); environment.lifecycle().manage(cassandraSessionManaged); return cassandraSessionManaged; }
@PostConstruct public void initialize() { String cassandraHosts = env.getProperty("ea.cassandra.hosts","localhost:9042"); String cassandraClusterName = env.getProperty("ea.cassandra.cluster","ElasticActorsCluster"); String cassandraKeyspaceName = env.getProperty("ea.cassandra.keyspace","\"ElasticActors\""); Integer cassandraPort = env.getProperty("ea.cassandra.port", Integer.class, 9042); Set<String> hostSet = StringUtils.commaDelimitedListToSet(cassandraHosts); String[] contactPoints = new String[hostSet.size()]; int i=0; for (String host : hostSet) { if(host.contains(":")) { contactPoints[i] = host.substring(0,host.indexOf(":")); } else { contactPoints[i] = host; } i+=1; } PoolingOptions poolingOptions = new PoolingOptions(); poolingOptions.setHeartbeatIntervalSeconds(60); poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, 2, env.getProperty("ea.cassandra.maxActive",Integer.class,Runtime.getRuntime().availableProcessors() * 3)); poolingOptions.setPoolTimeoutMillis(2000); Cluster cassandraCluster = Cluster.builder().withClusterName(cassandraClusterName) .addContactPoints(contactPoints) .withPort(cassandraPort) .withLoadBalancingPolicy(new RoundRobinPolicy()) .withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)) .withPoolingOptions(poolingOptions) .withReconnectionPolicy(new ConstantReconnectionPolicy(env.getProperty("ea.cassandra.retryDownedHostsDelayInSeconds",Integer.class,1) * 1000)) .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM)).build(); this.cassandraSession = cassandraCluster.connect(cassandraKeyspaceName); }
protected void prepareStatements() throws Exception { prepare(CQL_UPD_WORKFLOW_INSTANCE_NOT_WAITING); prepare(CQL_UPD_WORKFLOW_INSTANCE_WAITING); prepare(CQL_DEL_WORKFLOW_INSTANCE_WAITING); prepare(CQL_SEL_WORKFLOW_INSTANCE); prepare(CQL_UPD_WORKFLOW_INSTANCE_STATE); prepare(CQL_INS_EARLY_RESPONSE); prepare(CQL_DEL_EARLY_RESPONSE); prepare(CQL_SEL_EARLY_RESPONSE); prepare(CQL_UPD_WORKFLOW_INSTANCE_STATE_AND_RESPONSE_MAP); prepare(CQL_INS_WFI_ID); prepare(CQL_DEL_WFI_ID); prepare(CQL_SEL_WFI_ID_ALL, DefaultRetryPolicy.INSTANCE); }
/** * 描述: 初始化配置 * 时间: 2017年11月15日 上午11:25:07 * @author yi.zhang * @param servers 服务地址 * @param keyspace 命名空间 * @param username 账号 * @param password 密码 */ public void init(String servers,String keyspace,String username,String password) { try { // socket 链接配置 SocketOptions socket = new SocketOptions(); socket.setKeepAlive(true); socket.setReceiveBufferSize(1024* 1024); socket.setSendBufferSize(1024* 1024); socket.setConnectTimeoutMillis(5 * 1000); socket.setReadTimeoutMillis(1000); //设置连接池 PoolingOptions pool = new PoolingOptions(); // pool.setMaxRequestsPerConnection(HostDistance.LOCAL, 32); // pool.setMaxRequestsPerConnection(HostDistance.REMOTE, 32); // pool.setCoreConnectionsPerHost(HostDistance.LOCAL, 2); // pool.setCoreConnectionsPerHost(HostDistance.REMOTE, 2); // pool.setMaxConnectionsPerHost(HostDistance.LOCAL, 4); // pool.setMaxConnectionsPerHost(HostDistance.REMOTE, 4); pool.setHeartbeatIntervalSeconds(60); pool.setIdleTimeoutSeconds(120); pool.setPoolTimeoutMillis(5 * 1000); List<InetSocketAddress> saddress = new ArrayList<InetSocketAddress>(); if (servers != null && !"".equals(servers)) { for (String server : servers.split(",")) { String[] address = server.split(":"); String ip = address[0]; int port = 9042; if (address != null && address.length > 1) { port = Integer.valueOf(address[1]); } saddress.add(new InetSocketAddress(ip, port)); } } InetSocketAddress[] addresses = new InetSocketAddress[saddress.size()]; saddress.toArray(addresses); Builder builder = Cluster.builder(); builder.withSocketOptions(socket); // 设置压缩方式 builder.withCompression(ProtocolOptions.Compression.LZ4); // 负载策略 // DCAwareRoundRobinPolicy loadBalance = DCAwareRoundRobinPolicy.builder().withLocalDc("localDc").withUsedHostsPerRemoteDc(2).allowRemoteDCsForLocalConsistencyLevel().build(); // builder.withLoadBalancingPolicy(loadBalance); // 重试策略 builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); builder.withPoolingOptions(pool); builder.addContactPointsWithPorts(addresses); builder.withCredentials(username, password); Cluster cluster = builder.build(); if (keyspace != null && !"".equals(keyspace)) { session = cluster.connect(keyspace); } else { session = cluster.connect(); } mapping = new MappingManager(session); } catch (Exception e) { logger.error("-----Cassandra Config init Error-----", e); } }
@Override public RetryDecision onUnavailable(Statement stmt, ConsistencyLevel cl, int required, int aliveReplica, int retry) { return DefaultRetryPolicy.INSTANCE.onUnavailable(stmt, cl, required, aliveReplica, retry == 1 ? 0 : retry); }
@Override public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverException ex, int nbRetry) { return DefaultRetryPolicy.INSTANCE.onRequestError(stmt, cl, ex, nbRetry); }
@Override public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) { return DefaultRetryPolicy.INSTANCE.onReadTimeout(statement, cl, requiredResponses, receivedResponses, dataRetrieved, nbRetry); }
@Override public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) { return DefaultRetryPolicy.INSTANCE.onWriteTimeout(statement, cl, writeType, requiredAcks, receivedAcks, nbRetry); }
@Override public RetryPolicy build() { return DefaultRetryPolicy.INSTANCE; }
@Override public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) { return DefaultRetryPolicy.INSTANCE.onWriteTimeout(statement, cl, writeType, receivedAcks, receivedAcks, nbRetry); }
@Override public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) { return DefaultRetryPolicy.INSTANCE.onUnavailable(statement, cl, requiredReplica, aliveReplica, nbRetry); }
/** * @param ip String * @param port String * @param userName String * @param password String * @param poolingOptions PoolingOptions * @return Cluster Cluster */ private static Cluster createCluster(String ip, String port, String userName, String password, PoolingOptions poolingOptions) { return Cluster.builder().addContactPoint(ip).withPort(Integer.parseInt(port)) .withProtocolVersion(ProtocolVersion.V3).withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withTimestampGenerator(new AtomicMonotonicTimestampGenerator()) .withPoolingOptions(poolingOptions).withCredentials(userName, password).build(); }