private static ListenableFuture<ResultSet> executeAdaptiveQueryAsync(Session session, Statement statement, int fetchSize, int remainingAdaptations) { statement.setFetchSize(fetchSize); ResultSetFuture rawFuture = session.executeAsync(statement); // Lazily wrap the result set from the async result with an AdaptiveResultSet ListenableFuture<ResultSet> adaptiveFuture = Futures.transform(rawFuture, new Function<ResultSet, ResultSet>() { @Override public ResultSet apply(ResultSet resultSet) { return new AdaptiveResultSet(session, resultSet, remainingAdaptations); } }); return Futures.withFallback(adaptiveFuture, t -> { if (isAdaptiveException(t) && remainingAdaptations > 0 && fetchSize > MIN_FETCH_SIZE) { // Try again with half the fetch size int reducedFetchSize = Math.max(fetchSize / 2, MIN_FETCH_SIZE); _log.debug("Repeating previous query with fetch size {} due to {}", reducedFetchSize, t.getMessage()); return executeAdaptiveQueryAsync(session, statement, reducedFetchSize, remainingAdaptations - 1); } throw Throwables.propagate(t); }); }
public static boolean untilApplied(Session session, BatchStatement.Type type, Consumer<BatchStatement> transaction) { for (int i = 1; i <= MAX_RETRY; i ++) { BatchStatement batchStatement = new BatchStatement(type); transaction.accept(batchStatement); if (batchStatement.size() == 0) return false; boolean applied; if (batchStatement.size() > 1) { applied = session.execute(batchStatement).wasApplied(); } else { Statement statement = Iterables.getOnlyElement(batchStatement.getStatements()); applied = session.execute(statement).wasApplied(); } if (applied) return true; log.warn("Attempt {}/{} failed executing {}", i, MAX_RETRY, batchStatement); try { Thread.sleep(100 * i); } catch (InterruptedException e) { throw new AttemptsFailedException(e); } } throw new AttemptsFailedException(); }
@Override public long uploadPackage(DataPackage dataPack) { long time = System.currentTimeMillis(); try { Session session; Cluster cluster; cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); session = cluster.connect(); ByteBuffer buffer = ByteBuffer.wrap(dataPack.getData()); Statement statement = QueryBuilder.insertInto(DATABASE, MAIN_TABLE) .value(COL_ID, time) .value(COL_DATA, buffer) .value(COL_DESC, dataPack.getDescription()); session.execute(statement); } catch (Exception ex) { System.out.println(ex.getMessage()); } return time; }
@Override public DataPackage downloadPackage(long packageID) { DataPackage dataPack = new DataPackage(); try { Session session; Cluster cluster; cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); session = cluster.connect(); Statement statement = QueryBuilder.select() .all() .from(DATABASE, MAIN_TABLE) .where(eq(COL_ID, packageID)); ResultSet results = session.execute(statement); for(Row row : results) { dataPack.setId(row.getLong(COL_ID)); dataPack.setDescription(row.getString(COL_DESC)); dataPack.setData(row.getBytes(COL_DATA).array()); } } catch (Exception ex) { System.out.println(ex.getMessage()); } return dataPack; }
@Override public List<DataPackage> listPackages() { List<DataPackage> dataPacks = new ArrayList<>(); try { Session session; Cluster cluster; cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); session = cluster.connect(); Statement statement = QueryBuilder.select() .all() .from(DATABASE, MAIN_TABLE); ResultSet results = session.execute(statement); for(Row row : results) { DataPackage dataPack = new DataPackage(); dataPack.setId(row.getLong(COL_ID)); dataPack.setDescription(row.getString(COL_DESC)); dataPacks.add(dataPack); } } catch (Exception ex) { System.out.println(ex.getMessage()); } return dataPacks; }
/** * Tunes CQL statement execution options (consistency level, fetch option and etc.). * * @param statement Statement. * @return Modified statement. */ private Statement tuneStatementExecutionOptions(Statement statement) { String qry = ""; if (statement instanceof BoundStatement) { qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase(); } else if (statement instanceof PreparedStatement) { qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase(); } boolean readStatement = qry.startsWith("select"); boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement || qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update"); if (readStatement && readConsistency != null) { statement.setConsistencyLevel(readConsistency); } if (writeStatement && writeConsistency != null) { statement.setConsistencyLevel(writeConsistency); } if (fetchSize != null) { statement.setFetchSize(fetchSize); } return statement; }
protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) { if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); ResultSetFuture resultSetFuture = getSession().executeAsync(statement); return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() { @Nullable @Override public List<D> apply(@Nullable ResultSet resultSet) { Result<E> result = getMapper().map(resultSet); if (result != null) { List<E> entities = result.all(); return DaoUtil.convertDataList(entities); } else { return Collections.emptyList(); } } }); } return Futures.immediateFuture(Collections.emptyList()); }
protected ListenableFuture<D> findOneByStatementAsync(Statement statement) { if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); ResultSetFuture resultSetFuture = getSession().executeAsync(statement); return Futures.transform(resultSetFuture, new Function<ResultSet, D>() { @Nullable @Override public D apply(@Nullable ResultSet resultSet) { Result<E> result = getMapper().map(resultSet); if (result != null) { E entity = result.one(); return DaoUtil.getData(entity); } else { return null; } } }); } return Futures.immediateFuture(null); }
@Override public void update(Host host, Statement statement, Exception e, long nanos) { if (!(statement instanceof NamedBoundStatement)) return; Span span = cache.remove(statement); if (span == null) { if (statement.isTracing()) { LOG.warn("{} not in the cache eventhough tracing is on", statement); } return; } span.setDuration(nanos / 1000); // TODO: allow client tracer to end with duration Endpoint local = span.getAnnotations().get(0).host; // TODO: expose in brave long endTs = span.getTimestamp() + span.getDuration(); if (e != null) { span.addToBinary_annotations(BinaryAnnotation.create("cql.error", e.getMessage(), local)); } else { span.addToAnnotations(Annotation.create(endTs, "cr", local)); } int ipv4 = ByteBuffer.wrap(host.getAddress().getAddress()).getInt(); Endpoint endpoint = Endpoint.create("cassandra", ipv4, host.getSocketAddress().getPort()); span.addToBinary_annotations(BinaryAnnotation.address("sa", endpoint)); collector.collect(span); }
@Override public void update(Host host, Statement statement, Exception e, long nanos) { if (!(statement instanceof BoundStatement)) return; Span span = cache.remove(statement); if (span == null) { if (statement.isTracing()) { LOG.warn("{} not in the cache eventhough tracing is on", statement); } return; } span.setDuration(nanos / 1000); // TODO: allow client tracer to end with duration Endpoint local = span.getAnnotations().get(0).host; // TODO: expose in brave long endTs = span.getTimestamp() + span.getDuration(); span.addToAnnotations(Annotation.create(endTs, "cr", local)); if (e != null) { span.addToBinary_annotations(BinaryAnnotation.create(Constants.ERROR, e.getMessage(), local)); } int ipv4 = ByteBuffer.wrap(host.getAddress().getAddress()).getInt(); Endpoint endpoint = Endpoint.create("cassandra3", ipv4, host.getSocketAddress().getPort()); span.addToBinary_annotations(BinaryAnnotation.address("sa", endpoint)); collector.collect(span); }
public void executePut(String query, String consistency){ logger.debug("in data store handle, executing put:"+query); long start = System.currentTimeMillis(); Statement statement = new SimpleStatement(query); if(consistency.equalsIgnoreCase("critical")){ logger.info("Executing critical put query:"+query); statement.setConsistencyLevel(ConsistencyLevel.QUORUM); } else if (consistency.equalsIgnoreCase("eventual")){ logger.info("Executing normal put query:"+query); statement.setConsistencyLevel(ConsistencyLevel.ONE); } session.execute(statement); long end = System.currentTimeMillis(); logger.debug("Time taken for actual put in cassandra:"+(end-start)); }
/** * Get the arguments of a statement in an ordered key-value array. * * @param arg0 * The statement. * @return The key-value array. */ public static String[] getStatementArguments( Statement arg0 ) { String[] returnValue = EMTPY_STRING_ARRAY; if ( arg0 instanceof ProfiledBoundStatement ) { returnValue = ( (ProfiledBoundStatement) arg0 ).getArgumentList(); } else if ( arg0 instanceof BatchStatement ) { List<String> argumentList = new ArrayList<String>(); Collection<Statement> statements = ( (BatchStatement) arg0 ).getStatements(); for ( Statement statement : statements ) { String[] statementArguments = getStatementArguments( statement ); Collections.addAll( argumentList, statementArguments ); } returnValue = argumentList.toArray( new String[argumentList.size()] ); } return returnValue; }
/** * Get the name of a statement. * * @param arg0 The statement. * @return The name used for logging. */ public static String getStatementName( Statement arg0 ) { String returnValue = "unknown"; if ( arg0 instanceof RegularStatement ) { returnValue = ( (RegularStatement) arg0 ).getQueryString(); } else if ( arg0 instanceof BoundStatement ) { PreparedStatement preparedStatement = ( (BoundStatement) arg0 ).preparedStatement(); returnValue = preparedStatement.getQueryString(); } else if ( arg0 instanceof BatchStatement ) { StringBuilder value = new StringBuilder( "Batch : " ); Collection<Statement> statements = ( (BatchStatement) arg0 ).getStatements(); boolean first = true; for ( Statement statement : statements ) { if ( first ) { first = false; } else { value.append( ", " ); } String statementName = getStatementName( statement ); value.append( statementName ); } returnValue = value.toString(); } return returnValue; }
private Statement prepareStatement(final String query) { Statement stmt = null; if (query.length() <= POOLABLE_LENGTH) { PoolableWrapper<Statement> wrapper = stmtPool.get(query); if (wrapper != null) { stmt = wrapper.value(); } } if (stmt == null) { final NamedCQL namedCQL = getNamedCQL(query); final String cql = namedCQL.getPureCQL(); stmt = bind(prepare(cql)); if (query.length() <= POOLABLE_LENGTH) { stmtPool.put(query, PoolableWrapper.of(stmt)); } } return stmt; }
@Test public void testClusterHintsPollerWhenNodeDown() throws UnknownHostException { ClusterHintsPoller clusterHintsPoller = new ClusterHintsPoller(); Session mockSession = mock(Session.class); Cluster mockCluster = mock(Cluster.class); Metadata mockMetadata = mock(Metadata.class); when(mockCluster.getMetadata()).thenReturn(mockMetadata); when(mockCluster.getClusterName()).thenReturn("test-cluster"); Host node1 = mock(Host.class); when(node1.getAddress()).thenReturn(InetAddress.getByName("127.0.0.1")); Host node2 = mock(Host.class); when(node2.getAddress()).thenReturn(InetAddress.getByName("127.0.0.2")); Host node3 = mock(Host.class); when(node3.getAddress()).thenReturn(InetAddress.getByName("127.0.0.3")); when(mockSession.getCluster()).thenReturn(mockCluster); // The first node queried is down when(mockSession.execute(any(Statement.class))).thenThrow(new NoHostAvailableException(ImmutableMap.<InetSocketAddress, Throwable>of())); when(mockMetadata.getAllHosts()).thenReturn(ImmutableSet.of(node1, node2, node3)); HintsPollerResult actualResult = clusterHintsPoller.getOldestHintsInfo(mockSession); // Make sure HintsPollerResult fails assertFalse(actualResult.areAllHostsPolling(), "Result should show hosts failing"); assertEquals(actualResult.getHostFailure(), ImmutableSet.of(InetAddress.getByName("127.0.0.1")), "Node 1 should return with host failure"); }
private ArgumentMatcher<Statement> getHostStatementMatcher(final Host host, final String query) throws Exception { return new ArgumentMatcher<Statement>() { @Override public boolean matches(Object argument) { SelectedHostStatement statement = (SelectedHostStatement) argument; return ((SimpleStatement)statement.getStatement()).getQueryString().equals(query) && Objects.equals(statement.getHostCordinator().getAddress(), host.getAddress()); } @Override public void describeTo(Description description) { description.appendText(format("query:%s host:%s", query, host.getAddress().toString())); } }; }
private Record read(Key key, ByteBuffer rowKey, ReadConsistency consistency, DeltaPlacement placement) { checkNotNull(key, "key"); checkNotNull(consistency, "consistency"); TableDDL tableDDL = placement.getDeltaTableDDL(); Statement statement = selectFrom(tableDDL) .where(eq(tableDDL.getRowKeyColumnName(), rowKey)) .setConsistencyLevel(SorConsistencies.toCql(consistency)); // Track metrics _randomReadMeter.mark(); Iterator<Iterable<Row>> groupedRows = deltaQuery(placement, statement, true, "Failed to read record %s", key); Iterable<Row> rows; if (groupedRows.hasNext()) { rows = groupedRows.next(); } else { rows = ImmutableList.of(); } // Convert the results into a Record object, lazily fetching the rest of the columns as necessary. return newRecordFromCql(key, rows); }
/** * Scans a range of keys and returns an iterator containing each row's columns as an iterable. */ private Iterator<Iterable<Row>> rowScan(DeltaPlacement placement, @Nullable AstyanaxTable table, ByteBufferRange keyRange, ReadConsistency consistency) { ByteBuffer startToken = keyRange.getStart(); ByteBuffer endToken = keyRange.getEnd(); // Note: if Cassandra is asked to perform a token range query where start >= end it will wrap // around which is absolutely *not* what we want. checkArgument(AstyanaxStorage.compareKeys(startToken, endToken) < 0, "Cannot scan rows which loop from maximum- to minimum-token"); TableDDL tableDDL = placement.getDeltaTableDDL(); Statement statement = selectFrom(tableDDL) .where(gt(token(tableDDL.getRowKeyColumnName()), startToken)) .and(lte(token(tableDDL.getRowKeyColumnName()), endToken)) .setConsistencyLevel(SorConsistencies.toCql(consistency)); return deltaQueryAsync(placement, statement, false, "Failed to scan token range [%s, %s] for %s", ByteBufferUtil.bytesToHex(startToken), ByteBufferUtil.bytesToHex(endToken), table != null ? table : "multiple tables"); }
private Iterator<Iterable<Row>> migrationScan(DeltaPlacement placement, ByteBufferRange keyRange, ReadConsistency consistency) { ByteBuffer startToken = keyRange.getStart(); ByteBuffer endToken = keyRange.getEnd(); // Note: if Cassandra is asked to perform a token range query where start >= end it will wrap // around which is absolutely *not* what we want. checkArgument(AstyanaxStorage.compareKeys(startToken, endToken) < 0, "Cannot migrate rows which loop from maximum- to minimum-token"); TableDDL tableDDL = placement.getDeltaTableDDL(); // Our query needs to be inclusive on both sides so that we ensure that we get all records in the event of a re-split Statement statement = selectFrom(tableDDL) .where(gte(token(tableDDL.getRowKeyColumnName()), startToken)) .and(lte(token(tableDDL.getRowKeyColumnName()), endToken)) .setConsistencyLevel(SorConsistencies.toCql(consistency)); return deltaQueryAsync(placement, statement, false, "Failed to scan (for migration) token range [%s, %s] for %s", ByteBufferUtil.bytesToHex(startToken), ByteBufferUtil.bytesToHex(endToken), "multiple tables"); }
/** * Executes a query sychronously, dynamically adjusting the fetch size down if necessary. */ public static ResultSet executeAdaptiveQuery(Session session, Statement statement, int fetchSize) { int remainingAdaptations = MAX_ADAPTATIONS; while (true) { try { statement.setFetchSize(fetchSize); ResultSet resultSet = session.execute(statement); return new AdaptiveResultSet(session, resultSet, remainingAdaptations); } catch (Throwable t) { if (isAdaptiveException(t) && --remainingAdaptations != 0 && fetchSize > MIN_FETCH_SIZE) { // Try again with half the fetch size fetchSize = Math.max(fetchSize / 2, MIN_FETCH_SIZE); _log.debug("Repeating previous query with fetch size {} due to {}", fetchSize, t.getMessage()); } else { throw Throwables.propagate(t); } } } }
@Test public void should_generate_batch_statement() throws Exception { //Given Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;"); Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);"); Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;"); CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM), Option.<ConsistencyLevel>empty(), Option.empty(), Option.<RetryPolicy>empty(), Option.empty()); //When BatchStatement actual = helper.generateBatchStatement(UNLOGGED, options, toScalaList(asList(st1, st2, st3))); //Then assertThat(actual).isNotNull(); final List<Statement> statements = new ArrayList<>(actual.getStatements()); assertThat(statements).hasSize(3); assertThat(statements.get(0)).isSameAs(st1); assertThat(statements.get(1)).isSameAs(st2); assertThat(statements.get(2)).isSameAs(st3); assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM); }
@Override public RetryDecision onReadTimeout( Statement stmt, ConsistencyLevel cl, int required, int received, boolean retrieved, int retry) { if (retry > 1) { try { Thread.sleep(100); } catch (InterruptedException expected) { } } return null != stmt && stmt.isIdempotent() ? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow() : DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry); }
/** * Returns the Ref to which the specified name is mapped * * @param name the name whose associated value is to be returned * @return the Ref to which the specified name is mapped, or null if * the store contains no mapping for the name * @throws IOException if an exception occurs when communicating to the * database */ public Ref get(String name) throws IOException { try { Statement stmt = QueryBuilder .select() .all() .from(keyspace, TABLE_NAME) .where(QueryBuilder.eq("name", name)); ResultSet results = session.execute(stmt); Ref r = rowToRef(results.one()); if (!results.isExhausted()) { throw new IllegalStateException("Multiple rows for a single ref: " + name); } return r; } catch (RuntimeException e) { e.printStackTrace(); throw new IOException(e); } }
/** * @return a Collection view of all refs in the store * @throws IOException if an exception occurs when communicating to the * database */ public Collection<Ref> values() throws IOException { try { List<Ref> refs = new ArrayList<Ref>(); Statement stmt = QueryBuilder .select() .all() .from(keyspace, TABLE_NAME); stmt.setFetchSize(FETCH_SIZE); ResultSet results = session.execute(stmt); for (Row row : results) { refs.add(rowToRef(row)); } return refs; } catch (RuntimeException e) { e.printStackTrace(); throw new IOException(e); } }
/** * Inserts a row into the refs table. This works for both insertion of a * new row, and updating an existing row. * * @param name the primary key * @param type a type where the value is mapped to an integer through * the RefType enum * @param value the value, either a commit id or in the case of a * symbolic reference, the target name * @param auxValue an additional value, either the peeled object id in the * case of a peeled tag ref, or an empty string for all * other types of commits * @throws IOException if an exception occurs when communicating to the * database */ private void putRow(String name, RefType type, String value, String auxValue) throws IOException { try { Statement stmt = QueryBuilder.insertInto(keyspace, TABLE_NAME) .value("name", name) .value("type", type.getValue()) .value("value", value) .value("aux_value", auxValue); session.execute(stmt); } catch (RuntimeException e) { e.printStackTrace(); throw new IOException(e); } }
/** * Inserts a Pack description into the store. * If a description for this "name" already exists it will be overwritten. * * @param desc the pack description to insert * @throws IOException if an exception occurs when communicating to the * database */ public void insertDesc(Collection<DfsPackDescription> desc) throws IOException { try { for (DfsPackDescription pd : desc) { Statement stmt = QueryBuilder.insertInto(keyspace, DESC_TABLE_NAME) .value("name", pd.toString()) .value("source", pd.getPackSource().ordinal()) .value("last_modified", pd.getLastModified()) .value("size_map", DescMapper.getFileSizeMap(pd)) .value("object_count", pd.getObjectCount()) .value("delta_count", pd.getDeltaCount()) .value("extensions", DescMapper.getExtBits(pd)) .value("index_version", pd.getIndexVersion()); session.execute(stmt); } } catch (RuntimeException e) { e.printStackTrace(); throw new IOException(e); } }
/** * Returns a ByteBuffer with the contents of the file given by the pair * "desc" and "ext". * * @throws IOException if an exception occurs when communicating to the * database */ public ByteBuffer readFile(DfsPackDescription desc, PackExt ext) throws IOException { try { Statement stmt = QueryBuilder .select() .all() .from(keyspace, DATA_TABLE_NAME) .where(QueryBuilder.eq("name", desc.getFileName(ext))); ResultSet results = session.execute(stmt); Row r = results.one(); if (!results.isExhausted()) { throw new IllegalStateException("Multiple rows for a single file: " + desc.getFileName(ext)); } return r.getBytes("data"); } catch (RuntimeException e) { e.printStackTrace(); throw new IOException(e); } }
@Override public Iterator<Host> newQueryPlan(String keyspace, Statement statement) { List<Host> local = new ArrayList<>(1); List<Host> remote = new ArrayList<>(liveReplicaHosts.size()); for (Host liveReplicaHost : liveReplicaHosts) { if (isLocalHost(liveReplicaHost)) { local.add(liveReplicaHost); } else { remote.add(liveReplicaHost); } } Collections.shuffle(remote); logger.debug("Using the following hosts order for the new query plan: {} | {}", local, remote); return Iterators.concat(local.iterator(), remote.iterator()); }
@Override public Statement getDeleteStatement(CassandraPersistenceSession cassandraPersistenceSession, T entity) { String indexValue=getIndexValue(entity); if(indexValue==null){ return null; } if(isUnique()){ return deleteUniqueStatement.bind(getIndexName(),indexValue); } else{ String value=getValue(entity); if(value==null){ return null; } return deleteStatement.bind(getIndexName(),indexValue,value); } }
@Test public void managed() throws Throwable { final CassandraRule rule = CassandraRule.newBuilder() .withManagedKeyspace() .withManagedTable(TABLE_SCHEMA) .build(); rule.before(); final Session session = rule.getSession(); Insert insert = QueryBuilder.insertInto("mytable") .value("key", KEY) .value("value", VALUE); session.execute(insert); Statement select = QueryBuilder.select() .from("mytable") .where(QueryBuilder.eq("key", KEY)) .limit(1); Row result = session.execute(select).all().get(0); assertEquals(KEY, result.getString("key")); assertEquals(VALUE, result.getBytes("value")); rule.after(); }
@Test public void unmanaged() throws Throwable { final CassandraRule rule = CassandraRule.newBuilder().build(); rule.before(); final Session session = rule.getSession(); session.execute("CREATE KEYSPACE unmanaged WITH replication = " + "{'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); session.execute("CREATE TABLE unmanaged.unmanaged (foo TEXT PRIMARY KEY, bar INT );"); session.execute("INSERT INTO unmanaged.unmanaged (foo, bar) VALUES ('baz', 42);"); session.execute("USE unmanaged;"); Statement select = QueryBuilder.select().from("unmanaged"); List<Row> rows = session.execute(select).all(); assertEquals(1, rows.size()); assertEquals("baz", rows.get(0).getString("foo")); assertEquals(42, rows.get(0).getInt("bar")); session.execute("DROP KEYSPACE unmanaged;"); rule.after(); }
ResultSetFuture getFlightInfo(String origin, String dest, LocalDate flightDate) { Statement statement; if (origin != null) { statement = QueryBuilder .select() .all() .from("capstone", "flightinfo_origin") .where(eq("origin", origin)) .and(eq("flightdate", flightDate)); } else { statement = QueryBuilder .select() .all() .from("capstone", "flightinfo_dest") .allowFiltering() .where(eq("dest", dest)) .and(eq("flightdate", flightDate)); } return connect.executeAsync(statement); }
@Test public void should_generate_batch_statement() throws Exception { //Given Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;"); Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);"); Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;"); CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM), Option.<ConsistencyLevel>empty(), Option.empty(), Option.<RetryPolicy>empty(), Option.empty(), Option.empty()); //When BatchStatement actual = helper.generateBatchStatement(UNLOGGED, options, toScalaList(asList(st1, st2, st3))); //Then assertThat(actual).isNotNull(); final List<Statement> statements = new ArrayList<>(actual.getStatements()); assertThat(statements).hasSize(3); assertThat(statements.get(0)).isSameAs(st1); assertThat(statements.get(1)).isSameAs(st2); assertThat(statements.get(2)).isSameAs(st3); assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM); }
@Override public DBResult queryForLastWriteTimes() { if (cluster == null) return null; try (Session session = cluster.newSession()) { Statement statement = new SimpleStatement(String.format("SELECT WRITETIME (%s) FROM %s.%s;", columnName, keyspace, tableName)); statement.setConsistencyLevel(ConsistencyLevel.ALL); ResultSet results = session.execute(statement); List<Row> allRows = results.all(); // sort all of the rows accordingly allRows.sort(new RowComparator()); // gather the information we need long startTime = allRows.get(0).getLong(0) / 1000; long endTime = allRows.get(allRows.size() - 1).getLong(0) / 1000; int totalCount = allRows.size(); return new DBResult(startTime, endTime, totalCount); } }
@Override public RetryDecision onUnavailable(Statement statement, ConsistencyLevel consistencyLevel, int requiredReplica, int aliveReplica, int retries) { if (retries >= 10) { return RetryDecision.rethrow(); } try { int jitter = ThreadLocalRandom.current().nextInt(100); int delay = (100 * (retries + 1)) + jitter; Thread.sleep(delay); return RetryDecision.retry(consistencyLevel); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return RetryDecision.rethrow(); } }
/** * Tunes CQL statement execution options (consistency level, fetch option and etc.). * * @param statement Statement. * @return Modified statement. */ private Statement tuneStatementExecutionOptions(Statement statement) { String qry = ""; if (statement instanceof BoundStatement) qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase(); else if (statement instanceof PreparedStatement) qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase(); boolean readStatement = qry.startsWith("select"); boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement || qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update"); if (readStatement && readConsistency != null) statement.setConsistencyLevel(readConsistency); if (writeStatement && writeConsistency != null) statement.setConsistencyLevel(writeConsistency); if (fetchSize != null) statement.setFetchSize(fetchSize); return statement; }
@Override public Iterator<Host> newQueryPlan(String keyspace, Statement statement) { List<Host> local = new ArrayList<>(1); List<Host> remote = new ArrayList<>(liveReplicaHosts.size()); for (Host liveReplicaHost : liveReplicaHosts) { if (isLocalHost(liveReplicaHost)) { local.add(liveReplicaHost); } else { remote.add(liveReplicaHost); } } Collections.shuffle(remote); logger.trace("Using the following hosts order for the new query plan: {} | {}", local, remote); return Iterators.concat(local.iterator(), remote.iterator()); }
public static RequestPrime createParameterizedBatch( List<String> queries, List<Map<String, String>> paramTypes, List<Map<String, Object>> params) { Iterator<String> queryIterator = queries.iterator(); Iterator<Map<String, String>> paramTypesIterator = paramTypes.iterator(); Iterator<Map<String, Object>> paramsIterator = params.iterator(); List<com.datastax.oss.simulacron.common.request.Statement> statements = new ArrayList<>(); while (queryIterator.hasNext()) { statements.add( new com.datastax.oss.simulacron.common.request.Statement( queryIterator.next(), paramTypesIterator.next(), paramsIterator.next())); } Batch when = new Batch(statements, Collections.emptyList()); List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); HashMap row1 = new HashMap<String, String>(); row1.put("applied", "true"); rows.add(row1); Map<String, String> column_types_result = new HashMap<String, String>(); column_types_result.put("applied", "boolean"); Result then = new SuccessResult(rows, column_types_result); return new RequestPrime(when, then); }