@Override public Observable<ResultSet> createTempTablesIfNotExists(final Set<Long> timestamps) { return Observable.fromCallable(() -> { Set<String> tables = timestamps.stream() .map(this::getTempTableName) .collect(Collectors.toSet()); // TODO This is an IO operation.. metadata.getKeyspace(session.getLoggedKeyspace()).getTables().stream() .map(AbstractTableMetadata::getName) .filter(t -> t.startsWith(TEMP_TABLE_NAME_PROTOTYPE)) .forEach(tables::remove); return tables; }) .flatMapIterable(s -> s) .zipWith(Observable.interval(300, TimeUnit.MILLISECONDS), (st, l) -> st) .concatMap(this::createTemporaryTable); }
public ClusteredLoader(Mapper<Data> mapper, Class<Data> dataClass, Class<CKey> ckeyClass, String tableName) { MappingManager manager = mapper.getManager(); session = manager.getSession(); this.mapper = manager.mapper(dataClass); String keyspace = mapper.getTableMetadata().getKeyspace().getName(); MaterializedViewMetadata mv = mapper.getTableMetadata().getKeyspace().getMaterializedView(tableName); AbstractTableMetadata tableMetadata = mv == null ? mapper.getTableMetadata().getKeyspace().getTable(tableName) : mv; if (tableMetadata == null) { throw new IllegalArgumentException("No table or materialized view " + keyspace + "." + tableName + "found"); } List<ColumnMetadata> primaryKey = tableMetadata.getPrimaryKey(); String pkEq = exceptLast(primaryKey).stream() .map(c -> c.getName() + "=?") .collect(Collectors.joining(" and ")); List<ColumnMetadata> clusteringColumns = tableMetadata.getClusteringColumns(); String orderByDesc = orderBy(clusteringColumns, "DESC"); String orderByAsc = orderBy(clusteringColumns, "ASC"); String indexColumn = clusteringColumns.get(clusteringColumns.size() - 1).getName(); indexAccessor = CassandraUtil.findProperty(dataClass, ckeyClass, indexColumn); selectUnbounded = prepare(String.format("select * from %s.%s where " + pkEq + " order by %s limit ?", keyspace, tableName, orderByDesc)); selectBefore = prepare(String.format("select * from %s.%s where "+pkEq+" and %s < ? order by %s limit ?", keyspace, tableName, indexColumn, orderByDesc)); selectAfter = prepare(String.format("select * from %s.%s where "+pkEq+" and %s > ? order by %s limit ?", keyspace, tableName, indexColumn, orderByDesc)); selectBeforeAfter = prepare(String.format("select * from %s.%s where "+pkEq+" and %s < ? and %s > ? order by %s limit ?", keyspace, tableName, indexColumn, indexColumn, orderByDesc)); selectUnboundedAsc = prepare(String.format("select * from %s.%s where "+pkEq+" order by %s limit ?", keyspace, tableName, orderByAsc)); selectBeforeAsc = prepare(String.format("select * from %s.%s where "+pkEq+" and %s < ? order by %s limit ?", keyspace, tableName, indexColumn, orderByAsc)); selectAfterAsc = prepare(String.format("select * from %s.%s where "+pkEq+" and %s > ? order by %s limit ?", keyspace, tableName, indexColumn, orderByAsc)); selectBeforeAfterAsc = prepare(String.format("select * from %s.%s where "+pkEq+" and %s < ? and %s > ? order by %s limit ?", keyspace, tableName, indexColumn, indexColumn, orderByAsc)); selectByIdKey = prepare(String.format("select * from %s.%s where "+pkEq+" and %s=?", keyspace, tableName, indexColumn)); deleteByIdKey = prepare(String.format("delete from %s.%s where "+pkEq+" and %s=?", keyspace, tableName, indexColumn)); selectAllById = prepare(String.format("select * from %s.%s where " + pkEq, keyspace, tableName)); deleteAllById = prepare(String.format("delete from %s.%s where "+pkEq, keyspace, tableName)); }
/** Get the collation of all clustering key columns. * * @return A RelCollations representing the collation of all clustering keys */ public List<RelFieldCollation> getClusteringOrder(String columnFamily, boolean view) { AbstractTableMetadata table; if (view) { table = getKeyspace().getMaterializedView(columnFamily); } else { table = getKeyspace().getTable(columnFamily); } List<ClusteringOrder> clusteringOrder = table.getClusteringOrder(); List<RelFieldCollation> keyCollations = new ArrayList<RelFieldCollation>(); int i = 0; for (ClusteringOrder order : clusteringOrder) { RelFieldCollation.Direction direction; switch (order) { case DESC: direction = RelFieldCollation.Direction.DESCENDING; break; case ASC: default: direction = RelFieldCollation.Direction.ASCENDING; break; } keyCollations.add(new RelFieldCollation(i, direction)); i++; } return keyCollations; }