@Bean public Session createSession(CassandraProperties properties, Cluster cluster) throws Exception { Session session = Retriable.wrap(cluster::connect) .withErrorMessage("Cannot connect to cassandra cluster") .retryOn(NoHostAvailableException.class) .withDelaySec(properties.getConnectDelaySec()) .call(); initDb(properties, session); if (!session.getCluster().getMetadata().checkSchemaAgreement()) { log.warn("SCHEMA IS NOT IN AGREEMENT!!!"); } return session; }
@Test public void testVnodeSupport() throws Exception { // Validate that peers as appropriately discovered when connecting to a node and vnodes are // assigned. try (BoundCluster boundCluster = server.register(ClusterSpec.builder().withNumberOfTokens(256).withNodes(3, 3, 3)); Cluster driverCluster = defaultBuilder(boundCluster).build()) { driverCluster.init(); // Should be 9 hosts assertThat(driverCluster.getMetadata().getAllHosts()).hasSize(9); Set<Token> allTokens = new HashSet<>(); for (Host host : driverCluster.getMetadata().getAllHosts()) { assertThat(host.getTokens()).hasSize(256); allTokens.addAll(host.getTokens()); } // Should be 256*9 unique tokens. assertThat(allTokens).hasSize(256 * 9); } }
@Test public void testShouldFailToConnectWithOlderProtocolVersion() { try (BoundNode node = server.register(NodeSpec.builder().build()); Cluster cluster = defaultBuilder(node).withProtocolVersion(ProtocolVersion.V2).build()) { // Since simulacron does not support < V3, an exception should be thrown if we try to force // an older version. try { cluster.connect(); } catch (UnsupportedProtocolVersionException e) { // expected } // Should get a query log indicating invalid protocol version was used. assertThat(node.getLogs().getQueryLogs()).hasSize(1); QueryLog log = node.getLogs().getQueryLogs().get(0); Frame frame = log.getFrame(); assertThat(frame.protocolVersion).isEqualTo(2); assertThat(frame.warnings).hasSize(1); assertThat(frame.warnings.get(0)) .isEqualTo( "This message contains a non-supported protocol version by this node. STARTUP is inferred, but may not reflect the actual message sent."); assertThat(frame.message).isInstanceOf(Startup.class); } }
public static Session getClientSession(String hostAddr) { if(REGISTRY.containsKey(hostAddr)) { return REGISTRY.get(hostAddr); } else { Cluster.Builder clientClusterBuilder = new Cluster.Builder() .addContactPoint(hostAddr) .withQueryOptions(new QueryOptions() .setConsistencyLevel(ConsistencyLevel.ONE) .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) .withoutJMXReporting() .withoutMetrics() .withReconnectionPolicy(new ConstantReconnectionPolicy(RECONNECT_DELAY_IN_MS)); long startTimeInMillis = System.currentTimeMillis(); Cluster clientCluster = clientClusterBuilder.build(); Session clientSession = clientCluster.connect(); LOG.info("Client session established after {} ms.", System.currentTimeMillis() - startTimeInMillis); REGISTRY.putIfAbsent(hostAddr, clientSession); return clientSession; } }
public static Session createSession(String ip, int port) { Cluster cluster; cluster = Cluster.builder() .addContactPoint(ip) .withPort(port) .build(); Session session = cluster.connect(); session.execute("CREATE KEYSPACE IF NOT EXISTS cassandrait WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); session.execute("DROP TABLE IF EXISTS cassandrait.counter"); session.execute("CREATE TABLE cassandrait.counter (key text, value counter, PRIMARY key(key));"); return session; }
@Override public void startComponent() { if (cluster == null) { // Configure and build up the Cassandra cluster. cluster = Cluster.builder() .withClusterName(clusterName) .withPort(port) .withRetryPolicy(DefaultRetryPolicy.INSTANCE) // TokenAware requires query has routing info (e.g. BoundStatement with all PK value bound). .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())) .addContactPoints(contactPoints.toArray(new String[contactPoints.size()])) .build(); // Register any codecs. cluster.getConfiguration().getCodecRegistry() .register(new CassandraEnumCodec<>(AccessMode.class, AccessMode.getValueMap())) .register(new CassandraEnumCodec<>(Direction.class, Direction.getValueMap())) .register(new CassandraEnumCodec<>(SourceEntity.Type.class, SourceEntity.Type.getValueMap())); // Create a session. manager = new MappingManager(cluster.connect()); } }
@Override public Status selectByDevice(TsPoint point, Date startTime, Date endTime) { long costTime = 0L; Cluster cluster = null; try { // cluster = Cluster.builder().addContactPoint(CASSADRA_URL).build(); // Session session = cluster.connect(KEY_SPACE_NAME); Session session = SessionManager.getSession(); String selectCql = "SELECT * FROM point WHERE device_code='" + point.getDeviceCode() + "' and timestamp>=" + startTime.getTime() + " and timestamp<=" + endTime.getTime() + " ALLOW FILTERING"; // System.out.println(selectCql); long startTime1 = System.nanoTime(); ResultSet rs = session.execute(selectCql); long endTime1 = System.nanoTime(); costTime = endTime1 - startTime1; } finally { if (cluster != null) cluster.close(); } // System.out.println("此次查询消耗时间[" + costTime / 1000 + "]s"); return Status.OK(costTime); }
@Override public Status selectByDeviceAndSensor(TsPoint point, Date startTime, Date endTime) { long costTime = 0L; Cluster cluster = null; try { // cluster = Cluster.builder().addContactPoint(CASSADRA_URL).build(); // Session session = cluster.connect(KEY_SPACE_NAME); Session session = SessionManager.getSession(); String selectCql = "SELECT * FROM point WHERE device_code='" + point.getDeviceCode() + "' and sensor_code='" + point.getSensorCode() + "' and timestamp>=" + startTime.getTime() + " and timestamp<=" + endTime.getTime() + " ALLOW FILTERING"; // System.out.println(selectCql); long startTime1 = System.nanoTime(); ResultSet rs = session.execute(selectCql); long endTime1 = System.nanoTime(); costTime = endTime1 - startTime1; } finally { if (cluster != null) cluster.close(); } // System.out.println("此次查询消耗时间[" + costTime / 1000 + "]s"); return Status.OK(costTime); }
@Override public Status selectByDeviceAndSensor(TsPoint point, Double max, Double min, Date startTime, Date endTime) { long costTime = 0L; Cluster cluster = null; try { cluster = Cluster.builder().addContactPoint(CASSADRA_URL).build(); Session session = cluster.connect(KEY_SPACE_NAME); String createIndexCql = "CREATE INDEX IF NOT EXISTS value_index ON " + TABLE_NAME + "(value)"; // System.out.println(createIndexCql); long startTime1 = System.nanoTime(); session.execute(createIndexCql); String selectCql = "SELECT * FROM point WHERE device_code='" + point.getDeviceCode() + "' and sensor_code='" + point.getSensorCode() + "' and value<" + max + " and value>" + min + " and timestamp>=" + startTime.getTime() + " and timestamp<=" + endTime.getTime() + " ALLOW FILTERING"; // System.out.println(selectCql); ResultSet rs = session.execute(selectCql); long endTime1 = System.nanoTime(); costTime = endTime1 - startTime1; } finally { if (cluster != null) cluster.close(); } // System.out.println("此次查询消耗时间[" + costTime / 1000 + "]s"); return Status.OK(costTime); }
@Override public Status selectMaxByDeviceAndSensor(String deviceCode, String sensorCode, Date startTime, Date endTime) { long costTime = 0L; Cluster cluster = null; try { // cluster = Cluster.builder().addContactPoint(CASSADRA_URL).build(); // Session session = cluster.connect(KEY_SPACE_NAME); Session session = SessionManager.getSession(); String selectCql = "SELECT MAX(value) FROM point WHERE device_code='" + deviceCode + "' and sensor_code='" + sensorCode + "' and timestamp>=" + startTime.getTime() + " and timestamp<=" + endTime.getTime() + " ALLOW FILTERING"; long startTime1 = System.nanoTime(); // System.out.println("aaa"); ResultSet rs = session.execute(selectCql); // System.out.println("bbb"); long endTime1 = System.nanoTime(); costTime = endTime1 - startTime1; } finally { if (cluster != null) cluster.close(); } // System.out.println("此次查询消耗时间[" + costTime / 1000 + "]s"); return Status.OK(costTime); }
@Override public Status selectMinByDeviceAndSensor(String deviceCode, String sensorCode, Date startTime, Date endTime) { long costTime = 0L; Cluster cluster = null; try { cluster = Cluster.builder().addContactPoint(CASSADRA_URL).build(); Session session = cluster.connect(KEY_SPACE_NAME); String selectCql = "SELECT MIN(value) FROM point WHERE device_code='" + deviceCode + "' and sensor_code='" + sensorCode + "' and timestamp>=" + startTime.getTime() + " and timestamp<=" + endTime.getTime() + " ALLOW FILTERING"; // System.out.println(selectCql); long startTime1 = System.nanoTime(); ResultSet rs = session.execute(selectCql); long endTime1 = System.nanoTime(); costTime = endTime1 - startTime1; } finally { if (cluster != null) cluster.close(); } // System.out.println("此次查询消耗时间[" + costTime / 1000 + "]s"); return Status.OK(costTime); }
@Override public Status selectAvgByDeviceAndSensor(String deviceCode, String sensorCode, Date startTime, Date endTime) { long costTime = 0L; Cluster cluster = null; try { cluster = Cluster.builder().addContactPoint(CASSADRA_URL).build(); Session session = cluster.connect(KEY_SPACE_NAME); String selectCql = "SELECT AVG(value) FROM point WHERE device_code='" + deviceCode + "' and sensor_code='" + sensorCode + "' and timestamp>=" + startTime.getTime() + " and timestamp<=" + endTime.getTime() + " ALLOW FILTERING"; // System.out.println(selectCql); long startTime1 = System.nanoTime(); ResultSet rs = session.execute(selectCql); long endTime1 = System.nanoTime(); costTime = endTime1 - startTime1; } finally { if (cluster != null) cluster.close(); } // System.out.println("此次查询消耗时间[" + costTime / 1000 + "]s"); return Status.OK(costTime); }
@Override public Status selectCountByDeviceAndSensor(String deviceCode, String sensorCode, Date startTime, Date endTime) { long costTime = 0L; Cluster cluster = null; try { cluster = Cluster.builder().addContactPoint(CASSADRA_URL).build(); Session session = cluster.connect(KEY_SPACE_NAME); String selectCql = "SELECT COUNT(*) FROM point WHERE device_code='" + deviceCode + "' and sensor_code='" + sensorCode + "' and timestamp>=" + startTime.getTime() + " and timestamp<=" + endTime.getTime() + " ALLOW FILTERING"; // System.out.println(selectCql); long startTime1 = System.nanoTime(); ResultSet rs = session.execute(selectCql); long endTime1 = System.nanoTime(); costTime = endTime1 - startTime1; } finally { if (cluster != null) cluster.close(); } // System.out.println("此次查询消耗时间[" + costTime / 1000 + "]s"); return Status.OK(costTime); }
@Override public long uploadPackage(DataPackage dataPack) { long time = System.currentTimeMillis(); try { Session session; Cluster cluster; cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); session = cluster.connect(); ByteBuffer buffer = ByteBuffer.wrap(dataPack.getData()); Statement statement = QueryBuilder.insertInto(DATABASE, MAIN_TABLE) .value(COL_ID, time) .value(COL_DATA, buffer) .value(COL_DESC, dataPack.getDescription()); session.execute(statement); } catch (Exception ex) { System.out.println(ex.getMessage()); } return time; }
@Override public DataPackage downloadPackage(long packageID) { DataPackage dataPack = new DataPackage(); try { Session session; Cluster cluster; cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); session = cluster.connect(); Statement statement = QueryBuilder.select() .all() .from(DATABASE, MAIN_TABLE) .where(eq(COL_ID, packageID)); ResultSet results = session.execute(statement); for(Row row : results) { dataPack.setId(row.getLong(COL_ID)); dataPack.setDescription(row.getString(COL_DESC)); dataPack.setData(row.getBytes(COL_DATA).array()); } } catch (Exception ex) { System.out.println(ex.getMessage()); } return dataPack; }
@Override public List<DataPackage> listPackages() { List<DataPackage> dataPacks = new ArrayList<>(); try { Session session; Cluster cluster; cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); session = cluster.connect(); Statement statement = QueryBuilder.select() .all() .from(DATABASE, MAIN_TABLE); ResultSet results = session.execute(statement); for(Row row : results) { DataPackage dataPack = new DataPackage(); dataPack.setId(row.getLong(COL_ID)); dataPack.setDescription(row.getString(COL_DESC)); dataPacks.add(dataPack); } } catch (Exception ex) { System.out.println(ex.getMessage()); } return dataPacks; }
/** * Create cassandra keyspace. * @param tenant the keyspace name */ public void createCassandraKeyspace(String tenant) { StopWatch stopWatch = StopWatch.createStarted(); try { log.info("START - SETUP:CreateTenant:cassandra keyspace tenantKey: {}", tenant); Cluster.builder().addContactPoints(cassandraProperties.getContactPoints()) .build().connect().execute(String.format(properties.getCassandra().getKeyspaceCreateCql(), tenant)); log.info("STOP - SETUP:CreateTenant:cassandra keyspace tenantKey: {}, result: OK, time = {} ms", tenant, stopWatch.getTime()); } catch (Exception e) { log.error("STOP - SETUP:CreateTenant:cassandra keyspace tenantKey: {}, result: FAIL," + " error: {}, time = {} ms", tenant, e.getMessage(), stopWatch.getTime()); throw e; } }
private void migrateCassandra() { ClusterConfiguration clusterConfiguration = new ClusterConfiguration(); clusterConfiguration.setContactpoints(new String[]{cassandraProperties.getContactPoints()}); CassandraMigration cm = new CassandraMigration(); tenantListRepository.getTenants().forEach(tenantName -> { log.info("Start cassandra migration for tenant {}", tenantName); Cluster.builder().addContactPoints(cassandraProperties.getContactPoints()) .build().connect().execute(String.format(properties.getCassandra().getKeyspaceCreateCql(), tenantName)); KeyspaceConfiguration keyspaceConfiguration = new KeyspaceConfiguration(); keyspaceConfiguration.setName(tenantName.toLowerCase()); keyspaceConfiguration.setClusterConfig(clusterConfiguration); cm.setLocations(new String[]{properties.getCassandra().getMigrationFolder()}); cm.setKeyspaceConfig(keyspaceConfiguration); cm.migrate(); log.info("Stop cassandra migration for tenant {}", tenantName); }); }
/** */ private static synchronized Session adminSession() { if (adminSes != null) { return adminSes; } try { Cluster.Builder builder = Cluster.builder(); builder = builder.withCredentials(getAdminUser(), getAdminPassword()); builder.addContactPoints(getContactPoints()); builder.addContactPointsWithPorts(getContactPointsWithPorts()); adminCluster = builder.build(); return adminSes = adminCluster.connect(); } catch (Throwable e) { throw new RuntimeException("Failed to create admin session to Cassandra database", e); } }
/** */ private static synchronized Session regularSession() { if (regularSes != null) { return regularSes; } try { Cluster.Builder builder = Cluster.builder(); builder = builder.withCredentials(getRegularUser(), getRegularPassword()); builder.addContactPoints(getContactPoints()); builder.addContactPointsWithPorts(getContactPointsWithPorts()); regularCluster = builder.build(); return regularSes = regularCluster.connect(); } catch (Throwable e) { throw new RuntimeException("Failed to create regular session to Cassandra database", e); } }
@Override public ResultSet<CassandraDBContext> execute(Query<CassandraDBContext> query) throws QueryExecutionException { try (Cluster cassandraConnection = buildConnection()) { final Metadata metadata = cassandraConnection.getMetadata(); System.out.printf("Connected to cluster: %s", metadata.getClusterName()); for (final Host host : metadata.getAllHosts()) { System.out.printf("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()); } try (Session session = cassandraConnection.connect()) { String queryToExecute = query.getQuery(); System.out.println(queryToExecute); com.datastax.driver.core.ResultSet resultSet = session.execute(queryToExecute); printResultSet(resultSet); ExecutionInfo executionInfo = resultSet.getExecutionInfo(); System.out.println(executionInfo); } } // There isn't any resultset for these use-case return new CassandraResultSet(); }
static Session connect() { String contactPoint = "localhost"; String keySpace = "ks1"; if(session == null) { PoolingOptions poolingOptions = new PoolingOptions().setConnectionsPerHost(HostDistance.REMOTE, 1, 4); cluster = Cluster.builder().addContactPoint(contactPoint).withPoolingOptions(poolingOptions) .withCompression(Compression.SNAPPY).build(); cluster.init(); for (Host host : cluster.getMetadata().getAllHosts()) { System.out.printf("Address: %s, Rack: %s, Datacenter: %s, Tokens: %s\n", host.getAddress(), host.getDatacenter(), host.getRack(), host.getTokens()); } session = cluster.connect(keySpace); } return session; }
static Session connect() { String contactPoint = "localhost"; String keySpace = "ks1"; if(session == null) { RetryPolicy retryPolicy = new CustomRetryPolicy(3, 3, 2); cluster = Cluster.builder().addContactPoint(contactPoint) .withRetryPolicy(retryPolicy).build(); cluster.init(); for (Host host : cluster.getMetadata().getAllHosts()) { System.out.printf("Address: %s, Rack: %s, Datacenter: %s, Tokens: %s\n", host.getAddress(), host.getDatacenter(), host.getRack(), host.getTokens()); } } return session; }
static Session connect() { String contactPoint = "localhost"; String keySpace = "ks1"; if(session == null) { DCAwareRoundRobinPolicy dcAwarePolicy = new DCAwareRoundRobinPolicy.Builder().build(); LoadBalancingPolicy policy = new TokenAwarePolicy(dcAwarePolicy); cluster = Cluster.builder().addContactPoint(contactPoint) .withLoadBalancingPolicy(policy).build(); cluster.init(); for (Host host : cluster.getMetadata().getAllHosts()) { System.out.printf("Address: %s, Rack: %s, Datacenter: %s, Tokens: %s\n", host.getAddress(), host.getDatacenter(), host.getRack(), host.getTokens()); } } return session; }
protected Cluster.Builder createClusterBuilder() throws Exception { CassandraLoadBalancingPolicies cassLoadBalancingPolicies = new CassandraLoadBalancingPolicies(); Cluster.Builder clusterBuilder = Cluster.builder(); for (String host : hosts.split(",")) { clusterBuilder = clusterBuilder.addContactPoint(host); } if (port != null) { clusterBuilder = clusterBuilder.withPort(port); } if (clusterName != null) { clusterBuilder = clusterBuilder.withClusterName(clusterName); } if (username != null && !username.isEmpty() && password != null) { clusterBuilder.withCredentials(username, password); } if (loadBalancingPolicy != null && !loadBalancingPolicy.isEmpty()) { clusterBuilder.withLoadBalancingPolicy(cassLoadBalancingPolicies.getLoadBalancingPolicy(loadBalancingPolicy)); } return clusterBuilder; }
public void init(Class<T> tClass) { try { Cluster.Builder builder = Cluster.builder(); final String[] nodesList = nodes.split(","); for (String node : nodesList) { builder.addContactPoint(node).withPort(Integer.parseInt(port)); LOGGER.info(String.format("Added cassandra node : %s", node + ":" + port)); } cluster = builder.build(); session = null; if (keyspace != null) { session = cluster.connect(keyspace); } else { session = cluster.connect(); } MappingManager mappingManager = new MappingManager(session); mapper = mappingManager.mapper(tClass); } catch (Exception e) { LOGGER.error("Error initializing CassandraDao"); throw e; } }
public CassandraConfigDb(List<String> contactPoints, int port) { this.contactPoints = new ArrayList<InetAddress> (contactPoints.size()); for (String contactPoint : contactPoints) { try { this.contactPoints.add(InetAddress.getByName(contactPoint)); } catch (UnknownHostException e) { throw new IllegalArgumentException(e.getMessage()); } } this.port = port; cluster = (new Cluster.Builder()).withPort (this.port) .addContactPoints(this.contactPoints) .withSocketOptions(new SocketOptions().setReadTimeoutMillis(60000).setKeepAlive(true).setReuseAddress(true)) .withLoadBalancingPolicy(new RoundRobinPolicy()) .withReconnectionPolicy(new ConstantReconnectionPolicy(500L)) .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE)) .build (); session = cluster.newSession(); preparedStatements = new ConcurrentHashMap<StatementName, PreparedStatement> (); prepareStatementCreateLock = new Object(); }
@Override protected Result check() throws Exception { Cluster cassandraClient = null; Session cassandraSession = null; try { cassandraClient = createCassandraClient(); cassandraSession = cassandraClient.connect(keySpace); cassandraSession.execute(query); } catch (Exception e) { Exception wrappedException = wrapException(e); logger.error("Cassandra Healthcheck Failure", wrappedException); return Result.unhealthy(wrappedException); } finally { closeSessionQuietly(cassandraSession); closeClusterQuietly(cassandraClient); } return Result.healthy(); }
/** * Returns a Cassandra session object. * @return a connection session to Cassandra */ public static Session getClient() { if (session != null) { return session; } try { Builder builder = Cluster.builder().addContactPoints(DBHOSTS.split(",")). withPort(DBPORT).withCredentials(DBUSER, DBPASS); if (SSL) { builder.withSSL(); } cluster = builder.build(); session = cluster.connect(); if (!existsTable(Config.getRootAppIdentifier())) { createTable(session, Config.getRootAppIdentifier()); } else { session.execute("USE " + DBNAME + ";"); } logger.debug("Cassandra host: " + DBHOSTS + ":" + DBPORT + ", keyspace: " + DBNAME); } catch (Exception e) { logger.error("Failed to connect ot Cassandra: {}.", e.getMessage()); } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { shutdownClient(); } }); return session; }
/** * Get a Cassandra cluster using hosts and port. */ private Cluster getCluster(List<String> hosts, int port, String username, String password, String localDc, String consistencyLevel) { Cluster.Builder builder = Cluster.builder() .addContactPoints(hosts.toArray(new String[0])) .withPort(port); if (username != null) { builder.withAuthProvider(new PlainTextAuthProvider(username, password)); } if (localDc != null) { builder.withLoadBalancingPolicy( new TokenAwarePolicy(new DCAwareRoundRobinPolicy.Builder().withLocalDc(localDc).build())); } else { builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())); } if (consistencyLevel != null) { builder.withQueryOptions( new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel))); } return builder.build(); }
/** * Creates a session and ensures schema if configured. Closes the cluster and session if any * exception occurred. */ @Override public Session create(Cassandra3Storage cassandra) { Closer closer = Closer.create(); try { Cluster cluster = closer.register(buildCluster(cassandra)); cluster.register(new QueryLogger.Builder().build()); Session session; if (cassandra.ensureSchema) { session = closer.register(cluster.connect()); Schema.ensureExists(cassandra.keyspace, session); session.execute("USE " + cassandra.keyspace); } else { session = cluster.connect(cassandra.keyspace); } initializeUDTs(session); return session; } catch (RuntimeException e) { try { closer.close(); } catch (IOException ignored) { } throw e; } }
/** * Ensures that the Mock Cassandra instance is up and running. Will reinit * the database every time it is called. * * @param cassandraKeyspace Cassandra keyspace to setup. * @return A cluster object. * @throws ConfigurationException * @throws IOException * @throws InterruptedException * @throws TTransportException */ public static Cluster ensureMockCassandraRunningAndEstablished(String cassandraKeyspace) throws ConfigurationException, IOException, InterruptedException, TTransportException { Cluster cluster; long timeout = 60000; EmbeddedCassandraServerHelper.startEmbeddedCassandra(timeout); cluster = Cluster.builder().addContactPoints("127.0.0.1").withPort(9142).build(); //Thread.sleep(20000);//time to let cassandra startup final Metadata metadata = cluster.getMetadata(); Session session = cluster.connect(); Utils.initDatabase(DB_CQL, session); session = cluster.connect(cassandraKeyspace); logger.info("Connected to cluster: " + metadata.getClusterName() + '\n'); return cluster; }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(1000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000)); env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend")); CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource())) .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") .enableWriteAheadLog() .setClusterBuilder(new ClusterBuilder() { @Override public Cluster buildCluster(Cluster.Builder builder) { return builder.addContactPoint("127.0.0.1").build(); } }) .build(); sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello"); env.execute(); }
private void connectToMultipleAddresses(String address) { PoolingOptions poolingOptions = new PoolingOptions() .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); String[] music_hosts = address.split(","); if (cluster == null) { logger.debug("Initializing MUSIC Client with endpoints "+address); cluster = Cluster.builder() .withPort(9042) .withPoolingOptions(poolingOptions) .withoutMetrics() .addContactPoints(music_hosts) .build(); Metadata metadata = cluster.getMetadata(); logger.debug("Connected to cluster:"+metadata.getClusterName()+" at address:"+address); } session = cluster.connect(); }
@SuppressWarnings("unused") private void connectToCassaCluster(String address) { PoolingOptions poolingOptions = new PoolingOptions() .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); Iterator<String> it = getAllPossibleLocalIps().iterator(); logger.debug("Iterating through possible ips:"+getAllPossibleLocalIps()); while (it.hasNext()) { try { cluster = Cluster.builder() .withPort(9042) .withPoolingOptions(poolingOptions) .withoutMetrics() .addContactPoint(address) .build(); //cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(Integer.MAX_VALUE); Metadata metadata = cluster.getMetadata(); logger.debug("Connected to cluster:"+metadata.getClusterName()+" at address:"+address); session = cluster.connect(); break; } catch (NoHostAvailableException e) { address = it.next(); } } }
private void connectToCassaCluster(){ Iterator<String> it = getAllPossibleLocalIps().iterator(); String address= "localhost"; logger.debug("Connecting to cassa cluster: Iterating through possible ips:"+getAllPossibleLocalIps()); while(it.hasNext()){ try { cluster = Cluster.builder().withPort(9042).addContactPoint(address).build(); //cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(Integer.MAX_VALUE); Metadata metadata = cluster.getMetadata(); logger.debug("Connected to cassa cluster "+metadata.getClusterName()+" at "+address); /* for ( Host host : metadata.getAllHosts() ) { .out.printf("Datacenter: %s; Host broadcast: %s; Rack: %s\n", host.getDatacenter(), host.getBroadcastAddress(), host.getRack()); }*/ session = cluster.connect(); break; } catch (NoHostAvailableException e) { address= it.next(); } } }
private void connectToCassaCluster(){ Iterator<String> it = getAllPossibleLocalIps().iterator(); String address= "localhost"; // logger.debug("Connecting to cassa cluster: Iterating through possible ips:"+getAllPossibleLocalIps()); while(it.hasNext()){ try { cluster = Cluster.builder().withPort(9042).addContactPoint(address).build(); //cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(Integer.MAX_VALUE); Metadata metadata = cluster.getMetadata(); // logger.debug("Connected to cassa cluster "+metadata.getClusterName()+" at "+address); /* for ( Host host : metadata.getAllHosts() ) { System.out.printf("Datacenter: %s; Host broadcast: %s; Rack: %s\n", host.getDatacenter(), host.getBroadcastAddress(), host.getRack()); }*/ session = cluster.connect(); break; } catch (NoHostAvailableException e) { address= it.next(); } } }
@BeforeClass public static void startServer() throws InterruptedException, TTransportException, ConfigurationException, IOException { EmbeddedCassandraServerHelper.startEmbeddedCassandra(); Cluster cluster = new Cluster.Builder().addContactPoints("127.0.0.1").withPort(9142).build(); Session session = cluster.connect(); CQLDataLoader dataLoader = new CQLDataLoader(session); dataLoader.load(new ClassPathCQLDataSet("config/cql/create-tables.cql", true, "cassandra_unit_keyspace")); }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection); CassandraSink.addSink(source) .setQuery(INSERT) .setClusterBuilder(new ClusterBuilder() { @Override protected Cluster buildCluster(Builder builder) { return builder.addContactPoint("127.0.0.1").build(); } }) .build(); env.execute("WriteTupleIntoCassandra"); }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(1000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000)); env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend")); CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource())) .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") .enableWriteAheadLog() .setClusterBuilder(new ClusterBuilder() { private static final long serialVersionUID = 2793938419775311824L; @Override public Cluster buildCluster(Cluster.Builder builder) { return builder.addContactPoint("127.0.0.1").build(); } }) .build(); sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello"); env.execute(); }