@Override public Map<InetAddress, Collection<Range<Token>>> getEndpointRanges() { HashMap<InetAddress, Collection<Range<Token>>> map = new HashMap<>(); for (TokenRange range : metadata.getTokenRanges()) { Range<Token> tr = new Range<Token>(getToken(range.getStart()), getToken(range.getEnd())); for (Host host : metadata.getReplicas(getKeyspace(), range)) { Collection<Range<Token>> c = map.get(host.getAddress()); if (c == null) { c = new ArrayList<>(); map.put(host.getAddress(), c); } c.add(tr); } } return map; }
private Map<String,String> getMappingTable() { // You only want to prepare the query once. Best to do it when you initialize the session. PreparedStatement findMappings = session.prepare( "SELECT project_id, project_name " + "FROM openshift_metrics.metrics_mappings " + "WHERE token(project_id) > ? AND token(project_id) <= ?"); Map<String,String> mappings = new HashMap<>(); if (hasMappingTable()) { for (TokenRange tokenRange : getTokenRanges()) { BoundStatement boundStatement = findMappings.bind().setToken(0, tokenRange.getStart()) .setToken(1, tokenRange.getEnd()); ResultSet resultSet = session.execute(boundStatement); resultSet.forEach(row -> mappings.put(row.getString(0), row.getString(1))); } } mappings.remove("%succeeded"); return mappings; }
private Observable.Transformer<BoundStatement, Integer> applyMicroBatching() { return tObservable -> tObservable .groupBy(b -> { ByteBuffer routingKey = b.getRoutingKey(ProtocolVersion.NEWEST_SUPPORTED, codecRegistry); Token token = metadata.newToken(routingKey); for (TokenRange tokenRange : session.getCluster().getMetadata().getTokenRanges()) { if (tokenRange.contains(token)) { return tokenRange; } } log.warn("Unable to find any Cassandra node to insert token " + token.toString()); return session.getCluster().getMetadata().getTokenRanges().iterator().next(); }) .flatMap(g -> g.compose(new BoundBatchStatementTransformer())) .flatMap(batch -> rxSession .execute(batch) .compose(applyInsertRetryPolicy()) .map(resultSet -> batch.size()) ); }
public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception { ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); Map<TokenRange, Long> subSplits; subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session); // turn the sub-ranges into InputSplits String[] endpoints = new String[hosts.size()]; // hadoop needs hostname, not ip int endpointIndex = 0; for (Host endpoint : hosts) endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner; for (TokenRange subSplit : subSplits.keySet()) { List<TokenRange> ranges = subSplit.unwrap(); for (TokenRange subrange : ranges) { ColumnFamilySplit split = new ColumnFamilySplit( partitionerIsOpp ? subrange.getStart().toString().substring(2) : subrange.getStart().toString(), partitionerIsOpp ? subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(), subSplits.get(subSplit), endpoints); logger.trace("adding {}", split); splits.add(split); } } return splits; }
private String buildQuery(TokenRange tokenRange) { Token start = tokenRange.getStart(); Token end = tokenRange.getEnd(); List<String> pkColumns = tableMetadata.getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.toList()); String tokenStatement = String.format("token(%s)", String.join(", ", pkColumns)); StringBuilder ret = new StringBuilder(); ret.append("SELECT "); ret.append(tokenStatement); // add the token(pk) statement so that we can count partitions ret.append(", "); ret.append(columns); ret.append(" FROM "); ret.append(tableMetadata.getName()); if (start != null || end != null) ret.append(" WHERE "); if (start != null) { ret.append(tokenStatement); ret.append(" > "); ret.append(start.toString()); } if (start != null && end != null) ret.append(" AND "); if (end != null) { ret.append(tokenStatement); ret.append(" <= "); ret.append(end.toString()); } return ret.toString(); }
private static Set<TokenRange> maybeSplitRanges(Set<TokenRange> tokenRanges, int splitFactor) { if (splitFactor <= 1) return tokenRanges; Set<TokenRange> ret = new TreeSet<>(); for (TokenRange range : tokenRanges) ret.addAll(range.splitEvenly(splitFactor)); return ret; }
private List<BoundStatement> rangeQuery(PreparedStatement rangeStmt, TokenRange range) { List<BoundStatement> res = Lists.newArrayList(); for (TokenRange subRange : range.unwrap()) { res.add(rangeStmt.bind(subRange.getStart(), subRange.getEnd())); } return res; }
private Set<TokenRange> getTokenRanges() { Set<TokenRange> tokenRanges = new HashSet<>(); for (TokenRange tokenRange : session.getCluster().getMetadata().getTokenRanges()) { tokenRanges.addAll(tokenRange.unwrap()); } return tokenRanges; }
private Set<TokenRange> getTokenRanges() { Set<TokenRange> tokenRanges = new HashSet<>(); for (TokenRange tokenRange : metadata.getTokenRanges()) { tokenRanges.addAll(tokenRange.unwrap()); } return tokenRanges; }
private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range) { return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), metadata.newToken(partitioner.getTokenFactory().toString(range.right))); }
private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException { int splitSize = ConfigHelper.getInputSplitSize(conf); int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf); try { return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session); } catch (Exception e) { throw new RuntimeException(e); } }
private Map<TokenRange, Set<Host>> getRangeMap(String keyspace, Metadata metadata) { return metadata.getTokenRanges() .stream() .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p))); }
private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session) { String query = String.format("SELECT mean_partition_size, partitions_count " + "FROM %s.%s " + "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", SystemKeyspace.NAME, SystemKeyspace.SIZE_ESTIMATES); ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); Row row = resultSet.one(); long meanPartitionSize = 0; long partitionCount = 0; int splitCount = 0; if (row != null) { meanPartitionSize = row.getLong("mean_partition_size"); partitionCount = row.getLong("partitions_count"); splitCount = splitSizeMb > 0 ? (int)(meanPartitionSize * partitionCount / splitSizeMb / 1024 / 1024) : (int)(partitionCount / splitSize); } // If we have no data on this split or the size estimate is 0, // return the full split i.e., do not sub-split // Assume smallest granularity of partition count available from CASSANDRA-7688 if (splitCount == 0) { Map<TokenRange, Long> wrappedTokenRange = new HashMap<>(); wrappedTokenRange.put(tokenRange, (long) 128); return wrappedTokenRange; } List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); Map<TokenRange, Long> rangesWithLength = new HashMap<>(); for (TokenRange range : splitRanges) rangesWithLength.put(range, partitionCount/splitCount); return rangesWithLength; }
public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session) { this.tokenRange = tr; this.hosts = hosts; this.conf = conf; this.session = session; }
public State(TokenRange tokenRange, String query) { this.tokenRange = tokenRange; this.query = query; }
public boolean run() throws Exception { State state = currentState.get(); if (state == null) { // start processing a new token range TokenRange range = tokenRangeIterator.next(); if (range == null) return true; // no more token ranges to process state = new State(range, buildQuery(range)); currentState.set(state); } ResultSet results; Statement statement = new SimpleStatement(state.query); statement.setFetchSize(pageSize); if (state.pagingState != null) statement.setPagingState(state.pagingState); results = client.getSession().execute(statement); state.pagingState = results.getExecutionInfo().getPagingState(); int remaining = results.getAvailableWithoutFetching(); rowCount += remaining; for (Row row : results) { // this call will only succeed if we've added token(partition keys) to the query Token partition = row.getPartitionKeyToken(); if (!state.partitions.contains(partition)) { partitionCount += 1; state.partitions.add(partition); } if (--remaining == 0) break; } if (results.isExhausted() || isWarmup) { // no more pages to fetch or just warming up, ready to move on to another token range currentState.set(null); } return true; }
public TokenRangeIterator(StressSettings settings, Set<TokenRange> tokenRanges) { this.tokenRanges = maybeSplitRanges(tokenRanges, settings.tokenRange.splitFactor); this.pendingRanges = new ConcurrentLinkedQueue<>(this.tokenRanges); this.wrap = settings.tokenRange.wrap; }
public TokenRange next() { return pendingRanges.poll(); }
private void loadMetricIdCache(ExecutorService executor) { final AtomicInteger tasks = new AtomicInteger(0); logger.info("Found token ranges: " + cluster.getMetadata().getTokenRanges().size()); for (TokenRange range : cluster.getMetadata().getTokenRanges()) { List<BoundStatement> queries = rangeQuery(retrieveMetricIdStmt, range); for (BoundStatement query : queries) { tasks.incrementAndGet(); logger.info("adding a metric id reading task, total: " + tasks.get()); ResultSetFuture future = metricsSession.executeAsync(query); Futures.addCallback(future, new FutureCallback<ResultSet>() { @Override public void onSuccess(ResultSet result) { for (Row row : result) { String id = Bytes.toHexString(row.getBytes(METRIC_ID)); if (id != null) { //remove '0x' metricIdCache.put(id.substring(2), Boolean.TRUE); } } tasks.decrementAndGet(); logger.info("completed a metric id read task. Remaining tasks: " + tasks.get()); } @Override public void onFailure(Throwable t) { logger.error("Failed to execute query to load metric id cache.", t); tasks.decrementAndGet(); logger.info("Failed a metric id read task. Remaining tasks: " + tasks.get()); } }, executor); } } while (tasks.get() > 0) { logger.debug("waiting for more metric id load tasks: " + tasks.get()); try { Thread.sleep(3000); } catch (InterruptedException e) { logger.warn("load metric cache was interrupted", e); } } logger.info("loaded metric id cache from database: " + metricIdCache.size()); }
private void loadMetricDimensionCache(ExecutorService executor) { final AtomicInteger tasks = new AtomicInteger(0); for (TokenRange range : cluster.getMetadata().getTokenRanges()) { List<BoundStatement> queries = rangeQuery(retrieveMetricDimensionStmt, range); for (BoundStatement query : queries) { tasks.incrementAndGet(); logger.info("Adding a metric dimnesion read task, total: " + tasks.get()); ResultSetFuture future = metricsSession.executeAsync(query); Futures.addCallback(future, new FutureCallback<ResultSet>() { @Override public void onSuccess(ResultSet result) { for (Row row : result) { String key = getMetricDimnesionEntryKey(row.getString(REGION), row.getString(TENANT_ID_COLUMN), row.getString(METRIC_NAME), row.getString(DIMENSION_NAME), row.getString(DIMENSION_VALUE)); metricDimensionCache.put(key, Boolean.TRUE); } tasks.decrementAndGet(); logger.info("Completed a metric dimension read task. Remaining tasks: " + tasks.get()); } @Override public void onFailure(Throwable t) { logger.error("Failed to execute query to load metric id cache.", t); tasks.decrementAndGet(); logger.info("Failed a metric dimension read task. Remaining tasks: " + tasks.get()); } }, executor); } } while (tasks.get() > 0) { logger.debug("waiting for metric dimension cache to load ..."); try { Thread.sleep(1000); } catch (InterruptedException e) { logger.warn("load metric dimension cache was interrupted", e); } } logger.info("loaded metric dimension cache from database: " + metricDimensionCache.size()); }