public static boolean untilApplied(Session session, BatchStatement.Type type, Consumer<BatchStatement> transaction) { for (int i = 1; i <= MAX_RETRY; i ++) { BatchStatement batchStatement = new BatchStatement(type); transaction.accept(batchStatement); if (batchStatement.size() == 0) return false; boolean applied; if (batchStatement.size() > 1) { applied = session.execute(batchStatement).wasApplied(); } else { Statement statement = Iterables.getOnlyElement(batchStatement.getStatements()); applied = session.execute(statement).wasApplied(); } if (applied) return true; log.warn("Attempt {}/{} failed executing {}", i, MAX_RETRY, batchStatement); try { Thread.sleep(100 * i); } catch (InterruptedException e) { throw new AttemptsFailedException(e); } } throw new AttemptsFailedException(); }
/** * Tunes CQL statement execution options (consistency level, fetch option and etc.). * * @param statement Statement. * @return Modified statement. */ private Statement tuneStatementExecutionOptions(Statement statement) { String qry = ""; if (statement instanceof BoundStatement) { qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase(); } else if (statement instanceof PreparedStatement) { qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase(); } boolean readStatement = qry.startsWith("select"); boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement || qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update"); if (readStatement && readConsistency != null) { statement.setConsistencyLevel(readConsistency); } if (writeStatement && writeConsistency != null) { statement.setConsistencyLevel(writeConsistency); } if (fetchSize != null) { statement.setFetchSize(fetchSize); } return statement; }
public static void main(String[] args) { Session session = Connection.connect(); BatchStatement batchStatement = new BatchStatement(); PreparedStatement preparedStatement = session.prepare("insert into user (id, name) values (?, ?)"); int i = 0; while(i < 10) { batchStatement.add(preparedStatement.bind(UUIDs.timeBased(), "user-" + i)); ++i; } try { ResultSet rs = session.execute(batchStatement); System.out.println(rs); } catch (Exception ex) { ex.printStackTrace(); } Connection.close(); }
public CassandraPublisher() { super(); PropertyManagement properties = PropertyManagement.getProperties(); cassandraNode = properties.getProperty(CassandraPublisherPropertyValues.CASSANDRA_CONNECT_NODE, CassandraPublisherPropertyValues.CASSANDRA_CONNECT_NODE_DEFAULT); batchSize = properties.asInt(CassandraPublisherPropertyValues.CASSANDRA_BATCH_SIZE, CassandraPublisherPropertyValues.CASSANDRA_BATCH_SIZE_DEFAULT); flushFreq = properties.asInt(CassandraPublisherPropertyValues.CASSANDRA_FLUSH_FREQ, CassandraPublisherPropertyValues.CASSANDRA_FLUSH_FREQ_DEFAULT); insertOnly = properties.asBoolean(CassandraPublisherPropertyValues.CASSANDRA_INSERT_ONLY, CassandraPublisherPropertyValues.CASSANDRA_INSERT_ONLY_DEFAULT); batch = new BatchStatement(BatchStatement.Type.LOGGED); map = new HashMap<>(); timer = new Timer(); // reinitialize things publishEvents(); }
@Override public void store(ConfigPath key, String data) throws ConfigDbException { final BatchStatement batchStat = new BatchStatement(); batchStat.add(getStatement(StatementName.PUT_SETTING).bind(key.toString(), "", data)); ConfigPath parent; ConfigPath child = key; while ((parent = child.getParent()) != null) { batchStat.add(getStatement(StatementName.PUT_SETTING).bind(parent.toString(), child.toString(), data)); child = parent; } session.execute(batchStat); }
/** * Get the arguments of a statement in an ordered key-value array. * * @param arg0 * The statement. * @return The key-value array. */ public static String[] getStatementArguments( Statement arg0 ) { String[] returnValue = EMTPY_STRING_ARRAY; if ( arg0 instanceof ProfiledBoundStatement ) { returnValue = ( (ProfiledBoundStatement) arg0 ).getArgumentList(); } else if ( arg0 instanceof BatchStatement ) { List<String> argumentList = new ArrayList<String>(); Collection<Statement> statements = ( (BatchStatement) arg0 ).getStatements(); for ( Statement statement : statements ) { String[] statementArguments = getStatementArguments( statement ); Collections.addAll( argumentList, statementArguments ); } returnValue = argumentList.toArray( new String[argumentList.size()] ); } return returnValue; }
/** * Get the name of a statement. * * @param arg0 The statement. * @return The name used for logging. */ public static String getStatementName( Statement arg0 ) { String returnValue = "unknown"; if ( arg0 instanceof RegularStatement ) { returnValue = ( (RegularStatement) arg0 ).getQueryString(); } else if ( arg0 instanceof BoundStatement ) { PreparedStatement preparedStatement = ( (BoundStatement) arg0 ).preparedStatement(); returnValue = preparedStatement.getQueryString(); } else if ( arg0 instanceof BatchStatement ) { StringBuilder value = new StringBuilder( "Batch : " ); Collection<Statement> statements = ( (BatchStatement) arg0 ).getStatements(); boolean first = true; for ( Statement statement : statements ) { if ( first ) { first = false; } else { value.append( ", " ); } String statementName = getStatementName( statement ); value.append( statementName ); } returnValue = value.toString(); } return returnValue; }
public ResultSet batchInsert(final Class<?> targetClass, final Collection<? extends Map<String, Object>> propsList, final BatchStatement.Type type) { N.checkArgument(N.notNullOrEmpty(propsList), "'propsList' can't be null or empty."); final BatchStatement batchStatement = new BatchStatement(type == null ? BatchStatement.Type.LOGGED : type); if (settings != null) { batchStatement.setConsistencyLevel(settings.getConsistency()); batchStatement.setSerialConsistencyLevel(settings.getSerialConsistency()); batchStatement.setRetryPolicy(settings.getRetryPolicy()); if (settings.traceQuery) { batchStatement.enableTracing(); } else { batchStatement.disableTracing(); } } CP pair = null; for (Map<String, Object> props : propsList) { pair = prepareAdd(targetClass, props); batchStatement.add(prepareStatement(pair.cql, pair.parameters.toArray())); } return session.execute(batchStatement); }
@Override public Document update(Document entity) { Document old = read(entity.getId()); //will throw exception of doc is not found entity.setCreatedAt(old.getCreatedAt());//copy over the original create date Table table = entity.getTable(); PreparedStatement updateStmt = PreparedStatementFactory.getPreparedStatement(String.format(CREATE_CQL, table.toDbTable(), Columns.ID), getSession()); BoundStatement bs = new BoundStatement(updateStmt); bindCreate(bs, entity); BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); batch.add(bs);//the actual update try { List<BoundStatement> indexStatements = IndexMaintainerHelper.generateDocumentUpdateIndexEntriesStatements(getSession(), entity, bucketLocator); for (BoundStatement boundIndexStatement : indexStatements) { batch.add(boundIndexStatement);//the index updates } getSession().execute(batch); return entity; } catch (IndexParseException e) { throw new RuntimeException(e); } }
@Test public void should_generate_batch_statement() throws Exception { //Given Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;"); Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);"); Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;"); CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM), Option.<ConsistencyLevel>empty(), Option.empty(), Option.<RetryPolicy>empty(), Option.empty()); //When BatchStatement actual = helper.generateBatchStatement(UNLOGGED, options, toScalaList(asList(st1, st2, st3))); //Then assertThat(actual).isNotNull(); final List<Statement> statements = new ArrayList<>(actual.getStatements()); assertThat(statements).hasSize(3); assertThat(statements.get(0)).isSameAs(st1); assertThat(statements.get(1)).isSameAs(st2); assertThat(statements.get(2)).isSameAs(st3); assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM); }
public void sendBatch(BatchStatement.Type type, boolean addCounter, boolean addNonCounter) { assert addCounter || addNonCounter; BatchStatement b = new BatchStatement(type); for (int i = 0; i < 10; i++) { if (addNonCounter) b.add(noncounter.bind(i, "foo")); if (addCounter) b.add(counter.bind((long)i, i)); } session.execute(b); }
public void perform(CassandraPersistenceSession session, Object parameter, BatchStatement flush) { String deploymentId = (String) parameter; Session s = session.getSession(); List<Row> processDefinitionsToDelete = s.execute(QueryBuilder.select("id", "key", "version").from(ProcessDefinitionTableHandler.TABLE_NAME).where(eq("deployment_id", deploymentId))).all(); List<String> ids = new ArrayList<String>(); for (Row processDefinitionToDelete : processDefinitionsToDelete) { ids.add(processDefinitionToDelete.getString("id")); flush.add(QueryBuilder.delete().all().from(ProcessDefinitionTableHandler.TABLE_NAME_IDX_VERSION) .where(eq("key", processDefinitionToDelete.getString("key"))) .and(eq("version", processDefinitionToDelete.getInt("version")))); } flush.add(QueryBuilder.delete().all().from(ProcessDefinitionTableHandler.TABLE_NAME).where(in("id", ids))); }
@Test public void should_generate_batch_statement() throws Exception { //Given Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;"); Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);"); Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;"); CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM), Option.<ConsistencyLevel>empty(), Option.empty(), Option.<RetryPolicy>empty(), Option.empty(), Option.empty()); //When BatchStatement actual = helper.generateBatchStatement(UNLOGGED, options, toScalaList(asList(st1, st2, st3))); //Then assertThat(actual).isNotNull(); final List<Statement> statements = new ArrayList<>(actual.getStatements()); assertThat(statements).hasSize(3); assertThat(statements.get(0)).isSameAs(st1); assertThat(statements.get(1)).isSameAs(st2); assertThat(statements.get(2)).isSameAs(st3); assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM); }
public void load(Iterator<List<Object>> rows) { PreparedStatement statement = session.prepare(insertQuery); BatchStatement batch = createBatchStatement(); while (rows.hasNext()) { if (batch.size() >= batchRowsCount) { session.execute(batch); batch = createBatchStatement(); } List<Object> row = rows.next(); checkState(row.size() == columnsCount, "values count in a row is expected to be %d, but found: %d", columnsCount, row.size()); batch.add(statement.bind(row.toArray())); } if (batch.size() > 0) { session.execute(batch); } }
/** * Tunes CQL statement execution options (consistency level, fetch option and etc.). * * @param statement Statement. * @return Modified statement. */ private Statement tuneStatementExecutionOptions(Statement statement) { String qry = ""; if (statement instanceof BoundStatement) qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase(); else if (statement instanceof PreparedStatement) qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase(); boolean readStatement = qry.startsWith("select"); boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement || qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update"); if (readStatement && readConsistency != null) statement.setConsistencyLevel(readConsistency); if (writeStatement && writeConsistency != null) statement.setConsistencyLevel(writeConsistency); if (fetchSize != null) statement.setFetchSize(fetchSize); return statement; }
@SuppressWarnings("unchecked") @Override public void delete(final K... keys) { if (keys == null || keys.length == 0) { return; } BatchStatement batchStatement = new BatchStatement(); for (K key : keys) { if (key != null) { ByteBuffer serializedKey = _keySerializer.serialize(key); BoundStatement deleteStatement = _deleteStatement.bind(serializedKey); batchStatement.add(deleteStatement); } } _session.execute(batchStatement); }
@Override public void setAll(final Map<K, V> pairs) { if (pairs.size() == 0) { return; } BatchStatement batchStatement = new BatchStatement(); for (Map.Entry<K, V> entry : pairs.entrySet()) { batchStatement.add(getInsertStatement(entry.getKey(), entry.getValue())); } try { _session.execute(batchStatement); } catch (Exception e) { _log.error("failed to insert batch of " + pairs.size() + " dictionary entries", e); } }
private void retryQuery(String id, Statement query, final long startTime, int retryCount, DriverException e) throws DriverException { if (retryCount >= maxWriteRetries) { logger.error("[{}]: Query aborted after {} retry: ", id, retryCount, e.getMessage()); metricFailed.inc(((BatchStatement) query).size()); commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); throw e; } else { logger.warn("[{}]: Query failed, retrying {} of {}: {} ", id, retryCount, maxWriteRetries, e.getMessage()); try { Thread.sleep(1000 * (1 << retryCount)); } catch (InterruptedException ie) { logger.debug("[{}]: Interrupted: {}", id, ie); } _executeQuery(id, query, startTime, retryCount++); } }
public int handleFlush_batch(String id) { Statement query; int flushedCount = 0; BatchStatement batch = new BatchStatement(Type.UNLOGGED); while ((query = queue.poll()) != null) { flushedCount++; batch.add(query); } executeQuery(id, batch, System.nanoTime()); metricCompleted.inc(flushedCount); return flushedCount; }
private void logTokenBatchMap(String name, Map<Token, Deque<BatchStatement>> map) { if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(name); sb.append(": Size: ").append(map.size()); sb.append("; Tokens: |"); for (Entry<Token, Deque<BatchStatement>> entry : map.entrySet()) { sb.append(entry.getKey().toString()).append(":"); for (BatchStatement bs : entry.getValue()) { sb.append(bs.size()).append(","); } sb.append("|."); } logger.debug(sb.toString()); } }
private void logReplicaBatchMap(String name, Map<Set<Host>, Deque<BatchStatement>> map) { if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(name); sb.append(": Size: ").append(map.size()); sb.append(". Replicas: |"); for (Entry<Set<Host>, Deque<BatchStatement>> entry : map.entrySet()) { for (Host host : entry.getKey()) { sb.append(host.getAddress().toString()).append(","); } sb.append(":"); for (BatchStatement bs : entry.getValue()) { sb.append(bs.size()).append(","); } sb.append("|"); } logger.debug(sb.toString()); } }
@SuppressWarnings("unchecked") @Override public void delete(final K... keys) { if (keys == null || keys.length == 0) { return; } final BatchStatement batchStatement = new BatchStatement(); for (K key : keys) { if (key != null) { batchStatement.add(deleteStatement.bind(keySerializer.serialize(key))); } } session.execute(batchStatement); }
@Override public void output( Collection<Metric> metrics ) { if( metrics.size() == 0 ) { return; } Map<RetentionTable, BatchStatement> stms = LazyMap.<RetentionTable, BatchStatement>lazyMap( new HashMap<>(), () -> new BatchStatement() ); for ( Metric metric : metrics ) { insertMetricIntoBatch( metric, stms ); } KeyspaceMetadata metadata = cluster.getMetadata().getKeyspace( keyspace ); for (RetentionTable table : stms.keySet()) { createTableIfNecessary( table, metadata ); } for ( BatchStatement batch : stms.values() ) { try { session.execute( batch ); } catch ( WriteTimeoutException e ) { log.info( "WriteTimeoutException while sending Metrics to cassandra." ); log.info( e.getMessage() ); log.info( "According to http://www.datastax.com/dev/blog/how-cassandra-deals-with-replica-failure, this is harmless" ); } } EventBusManager.fire( new DrainMetricOutputEvent( ( new PersistentCassandraDrainFactory<>().handledType() ), metrics.size() ) ); }
public void insertMetrics( RetentionTable table, Collection<Metric> metrics ) { if ( dryRun ) { log.debug( "Inserting " + metrics.toString() + " into " + table ); return; } if ( session == null ) { open(); } BatchStatement batch = new BatchStatement(); for ( Metric metric : metrics ) { String[] columns = { COL_NAME, COL_TIME, COL_VALUE }; Object[] values = { metric.name(), metric.timestamp(), metric.value() }; Statement stm = QueryBuilder.insertInto( table.tableName() ).values( columns, values ); batch.add( stm ); } session.execute( batch ); }
private void handle(final BatchStatement batchStmt, final UnmodifiableIterator<T> batchablesIt, final Function<T, ListenableFuture<Statement>> statementFetcher) { if (batchablesIt.hasNext()) { final ListenableFuture<Statement> statementFuture = statementFetcher.apply(batchablesIt.next()); Runnable resultHandler = new Runnable() { @Override public void run() { try { batchStmt.add(statementFuture.get()); handle(batchStmt, batchablesIt, statementFetcher); } catch (InterruptedException | ExecutionException | RuntimeException e) { setException(ListenableFutures.unwrapIfNecessary(e)); } } }; statementFuture.addListener(resultHandler, MoreExecutors.directExecutor()); } else { set(batchStmt); } }
@Override public void transactionMarker() throws Exception { BatchStatement batchStatement = new BatchStatement(); batchStatement.add(new SimpleStatement( "INSERT INTO test.users (id, fname, lname) VALUES (100, 'f100', 'l100')")); batchStatement.add(new SimpleStatement( "INSERT INTO test.users (id, fname, lname) VALUES (101, 'f101', 'l101')")); PreparedStatement preparedStatement = session.prepare("INSERT INTO test.users (id, fname, lname) VALUES (?, ?, ?)"); for (int i = 200; i < 210; i++) { BoundStatement boundStatement = new BoundStatement(preparedStatement); boundStatement.bind(i, "f" + i, "l" + i); batchStatement.add(boundStatement); } batchStatement.add(new SimpleStatement( "INSERT INTO test.users (id, fname, lname) VALUES (300, 'f300', 'l300')")); session.executeAsync(batchStatement).get(); }
@Override public void transactionMarker() throws Exception { BatchStatement batchStatement = new BatchStatement(); batchStatement.add(new SimpleStatement( "INSERT INTO test.users (id, fname, lname) VALUES (100, 'f100', 'l100')")); batchStatement.add(new SimpleStatement( "INSERT INTO test.users (id, fname, lname) VALUES (101, 'f101', 'l101')")); PreparedStatement preparedStatement = session.prepare("INSERT INTO test.users (id, fname, lname) VALUES (?, ?, ?)"); for (int i = 200; i < 210; i++) { BoundStatement boundStatement = new BoundStatement(preparedStatement); boundStatement.bind(i, "f" + i, "l" + i); batchStatement.add(boundStatement); } batchStatement.add(new SimpleStatement( "INSERT INTO test.users (id, fname, lname) VALUES (300, 'f300', 'l300')")); session.execute(batchStatement); }
private void addAvailabilityDataInThePast(Metric<AvailabilityType> metric, final Duration duration) throws Exception { try { metricsService.setDataAccess(new DelegatingDataAccess(dataAccess) { // @Override public Observable<Integer> insertAvailabilityData(Metric<AvailabilityType> m, int ttl) { int actualTTL = ttl - duration.toStandardSeconds().getSeconds(); long writeTime = now().minus(duration).getMillis() * 1000; BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); for (DataPoint<AvailabilityType> a : m.getDataPoints()) { batchStatement.add(insertAvailabilityDateWithTimestamp.bind(m.getMetricId().getTenantId(), AVAILABILITY.getCode(), m.getMetricId().getName(), DPART, getTimeUUID(a.getTimestamp()), getBytes(a), actualTTL, writeTime)); } return rxSession.execute(batchStatement).map(resultSet -> batchStatement.size()); } }); metricsService.addDataPoints(AVAILABILITY, Observable.just(metric)); } finally { metricsService.setDataAccess(dataAccess); } }
/** * Executes cql batch statements in Cassandra */ @Override public void run() { LOG.debug("[" + this + "] Executing batch write to cassandra"); try { final PreparedStatement preparedStatement = sessionWithHost.prepare(cql); final BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); for (final List<Object> record : records) { batchStatement.add(preparedStatement.bind(record.toArray(new Object[record.size()]))); } sessionWithHost.execute(batchStatement); } catch (Exception e) { LOG.error("[" + this + "] Exception occurred while trying to execute batch in cassandra: " + e.getMessage()); } }
private String retrieveSql(Object args0) { String sql; if (args0 instanceof BoundStatement) { sql = ((BoundStatement) args0).preparedStatement().getQueryString(); } else if (args0 instanceof RegularStatement) { sql = ((RegularStatement) args0).getQueryString(); } else if (args0 instanceof StatementWrapper) { // method to get wrapped statement is package-private, skip. sql = null; } else if (args0 instanceof BatchStatement) { // we could unroll all the batched statements and append ; between them if need be but it could be too long. sql = null; } else if (args0 instanceof String) { sql = (String) args0; } else { sql = null; } return sql; }
@Test public void testSaveToCassandra() throws Exception { LOGGER.debug("Connecting to Cassandra Quorum: " + conf.getStringList("cassandra.hosts").toString()); SaveToCassandraActionExecutionFunction func = new SaveToCassandraActionExecutionFunction( getHostsStringFromList(conf.getStringList("cassandra.hosts")), ProtocolOptions.DEFAULT_PORT, 50, BatchStatement.Type.UNLOGGED); List<StratioStreamingMessage> list = new ArrayList<StratioStreamingMessage>(); message.setColumns(StreamsHelper.COLUMNS3); list.add(message); Exception ex = null; try { func.process(list); } catch (Exception e) { ex = e; ex.printStackTrace(); } assertNull("Expected null value", ex); }
@Override protected void doStore(ImmutableSet<EquivalenceGraph> graphs) { BatchStatement updateBatch = new BatchStatement(); updateBatch.setConsistencyLevel(write); for (EquivalenceGraph graph : graphs) { Long graphId = lowestId(graph); ByteBuffer serializedGraph = serializer.serialize(graph); updateBatch.add(graphInsert(graphId, serializedGraph)); for (Entry<Id, Adjacents> adjacency : graph.getAdjacencyList().entrySet()) { updateBatch.add(indexInsert(adjacency.getKey().longValue(), graphId)); } } session.execute(updateBatch); }
@Override public Organisation write(Organisation organisation) { metricRegistry.meter(writeMetricPrefix + METER_CALLED).mark(); try { Id id = organisation.getId(); ByteBuffer serializedOrganisation = ByteBuffer.wrap(serializer.serialize(organisation) .toByteArray()); BatchStatement batchStatement = new BatchStatement(); Statement writeOrganisation = rowUpdate.bind() .setLong(ORGANISATION_ID, id.longValue()) .setBytes(DATA, serializedOrganisation); Statement writeUri = organisationUriStore.prepareWritingStatement(organisation); batchStatement.add(writeOrganisation); batchStatement.add(writeUri); session.execute(batchStatement); return organisation; } catch (RuntimeException e) { metricRegistry.meter(writeMetricPrefix + METER_FAILURE).mark(); throw Throwables.propagate(e); } }
private FactoryData(final SocketAddress[] contactPoints, final ColumnMapping[] columns, final boolean useTls, final String clusterName, final String keyspace, final String table, final String username, final String password, final boolean useClockForTimestampGenerator, final int bufferSize, final boolean batched, final BatchStatement.Type batchType) { super(bufferSize, null); this.contactPoints = convertAndAddDefaultPorts(contactPoints); this.columns = columns; this.useTls = useTls; this.clusterName = clusterName; this.keyspace = keyspace; this.table = table; this.username = username; this.password = password; this.useClockForTimestampGenerator = useClockForTimestampGenerator; this.batched = batched; this.batchType = batchType; }
@Test public void testBatchPrimeSimple() throws Exception { String query = "INSERT INTO a.b(c, d) VALUES( (?, ?)"; Map<String, String> param_types = new HashMap<String, String>(); param_types.put("column1", "ascii"); param_types.put("column2", "int"); Map<String, Object> params = new HashMap<String, Object>(); params.put("column1", "column1"); params.put("column2", "2"); RequestPrime prime = HttpTestUtil.createSimpleParameterizedBatch(query, param_types, params); HttpTestResponse response = server.prime(prime); assertNotNull(response); RequestPrime responseQuery = om.readValue(response.body, RequestPrime.class); assertThat(responseQuery).isEqualTo(prime); String contactPoint = HttpTestUtil.getContactPointString(server.getCluster(), 0); BatchStatement statement = HttpTestUtil.makeNativeBatchStatement( Arrays.asList(query), Arrays.asList(Arrays.asList("column1", 2))); ResultSet set = HttpTestUtil.executeQueryWithFreshSession(statement, contactPoint); assertResult(set); }
public static BatchStatement makeNativeBatchStatement(List<String> queries, List<List> values) { BatchStatement statement = new BatchStatement(); Iterator<List> valuesIterator = values.iterator(); for (String query : queries) { List value = valuesIterator.next(); statement.add(new SimpleStatement(query, value.toArray(new Object[value.size()]))); } return statement; }
@Override public Status insertMulti(List<TsPoint> points) { long costTime = 0L; if (points != null) { Cluster cluster = null; try { // cluster = Cluster.builder().addContactPoint(CASSADRA_URL).build(); // Session session = cluster.connect(KEY_SPACE_NAME); Session session = SessionManager.getSession(); BatchStatement batch = new BatchStatement(); PreparedStatement ps = session.prepare( "INSERT INTO " + TABLE_NAME + "(timestamp,device_code,sensor_code,value) VALUES(?,?,?,?)"); for (TsPoint point : points) { BoundStatement bs = ps.bind(new Date(point.getTimestamp()), point.getDeviceCode(), point.getSensorCode(), Double.parseDouble(point.getValue().toString())); batch.add(bs); } long startTime = System.nanoTime(); session.execute(batch); long endTime = System.nanoTime(); costTime = endTime - startTime; batch.clear(); // session.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (cluster != null) cluster.close(); } } // System.out.println("costTime=" + costTime); return Status.OK(costTime); }