@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle handle, ConnectorSession session, ConnectorTableLayoutHandle layout) { log.info("INFORMATION: AmpoolSplitManager getSplits() called."); AmpoolTableLayoutHandle layoutHandle = (AmpoolTableLayoutHandle) layout; AmpoolTableHandle tableHandle = layoutHandle.getTable(); AmpoolTable table = new AmpoolTable(ampoolClient, tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table.getColumnsMetadata() != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); // TODO Pass here bucket id splits.add(new AmpoolSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(),"" ,HostAddress.fromParts("localhost",0))); Collections.shuffle(splits); return new FixedSplitSource(splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KuduTableLayoutHandle layoutHandle = checkType(layout, KuduTableLayoutHandle.class, "layout"); KuduTableHandle tableHandle = layoutHandle.getTable(); KuduClient kuduClient = kuduClientManager.getClient(); List<KuduScanToken> tokens = kuduClientManager.newScanTokenBuilder(kuduClient, tableHandle.getSchemaTableName().getTableName()).build(); TupleDomain<KuduColumnHandle> effectivePredicate = layoutHandle.getConstraint() .transform(handle -> checkType(handle, KuduColumnHandle.class, "columnHandle")); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (int i = 0; i < tokens.size(); i++) { // nodeManager.getWorkerNodes() List<HostAddress> hostAddresses = nodeManager.getWorkerNodes().stream() .map(node -> node.getHostAndPort()).collect(Collectors.toList()); ConnectorSplit split = new KuduSplit(hostAddresses, tableHandle.getSchemaTableName(), i, effectivePredicate); builder.add(split); } kuduClientManager.close(kuduClient); return new FixedSplitSource(builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) { ExampleTableLayoutHandle layoutHandle = checkType(layout, ExampleTableLayoutHandle.class, "layout"); ExampleTableHandle tableHandle = layoutHandle.getTable(); ExampleTable table = exampleClient.getTable(tableHandle.getSchemaName(), tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); for (URI uri : table.getSources()) { splits.add(new ExampleSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(), uri)); } Collections.shuffle(splits); return new FixedSplitSource(connectorId, splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layoutHandle) { BlackHoleTableLayoutHandle layout = checkType( layoutHandle, BlackHoleTableLayoutHandle.class, "BlackHoleTableLayoutHandle"); ImmutableList.Builder<BlackHoleSplit> builder = ImmutableList.<BlackHoleSplit>builder(); for (int i = 0; i < layout.getSplitCount(); i++) { builder.add( new BlackHoleSplit( layout.getPagesPerSplit(), layout.getRowsPerPage(), layout.getFieldsLength())); } return new FixedSplitSource("blackhole", builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { JmxTableLayoutHandle jmxLayout = checkType(layout, JmxTableLayoutHandle.class, "layout"); JmxTableHandle tableHandle = jmxLayout.getTable(); TupleDomain<ColumnHandle> predicate = jmxLayout.getConstraint(); //TODO is there a better way to get the node column? JmxColumnHandle nodeColumnHandle = tableHandle.getColumns().get(0); List<ConnectorSplit> splits = nodeManager.getNodes(ACTIVE) .stream() .filter(node -> { NullableValue value = NullableValue.of(VARCHAR, utf8Slice(node.getNodeIdentifier())); return predicate.overlaps(fromFixedValues(ImmutableMap.of(nodeColumnHandle, value))); }) .map(node -> new JmxSplit(tableHandle, ImmutableList.of(node.getHostAndPort()))) .collect(toList()); return new FixedSplitSource(connectorId, splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { InformationSchemaTableLayoutHandle handle = checkType(layout, InformationSchemaTableLayoutHandle.class, "layout"); Map<ColumnHandle, NullableValue> bindings = extractFixedValues(handle.getConstraint()).orElse(ImmutableMap.of()); List<HostAddress> localAddress = ImmutableList.of(nodeManager.getCurrentNode().getHostAndPort()); Map<String, NullableValue> filters = bindings.entrySet().stream().collect(toMap( entry -> checkType(entry.getKey(), InformationSchemaColumnHandle.class, "column").getColumnName(), Entry::getValue)); ConnectorSplit split = new InformationSchemaSplit(handle.getTable(), filters, localAddress); return new FixedSplitSource(null, ImmutableList.of(split)); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { TpchTableHandle tableHandle = checkType(layout, TpchTableLayoutHandle.class, "layout").getTable(); Set<Node> nodes = nodeManager.getActiveDatasourceNodes(connectorId); checkState(!nodes.isEmpty(), "No TPCH nodes available"); int totalParts = nodes.size() * splitsPerNode; int partNumber = 0; // Split the data using split and skew by the number of nodes available. ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (Node node : nodes) { for (int i = 0; i < splitsPerNode; i++) { splits.add(new TpchSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()))); partNumber++; } } return new FixedSplitSource(connectorId, splits.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { CassandraTableLayoutHandle layoutHandle = checkType(layout, CassandraTableLayoutHandle.class, "layout"); CassandraTableHandle cassandraTableHandle = layoutHandle.getTable(); List<CassandraPartition> partitions = layoutHandle.getPartitions().get(); requireNonNull(partitions, "partitions is null"); if (partitions.isEmpty()) { return new FixedSplitSource(connectorId, ImmutableList.<ConnectorSplit>of()); } // if this is an unpartitioned table, split into equal ranges if (partitions.size() == 1) { CassandraPartition cassandraPartition = partitions.get(0); if (cassandraPartition.isUnpartitioned() || cassandraPartition.isIndexedColumnPredicatePushdown()) { CassandraTable table = schemaProvider.getTable(cassandraTableHandle); List<ConnectorSplit> splits = getSplitsByTokenRange(table, cassandraPartition.getPartitionId()); return new FixedSplitSource(connectorId, splits); } } return new FixedSplitSource(connectorId, getSplitsForPartitions(cassandraTableHandle, partitions)); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KinesisTableLayoutHandle kinesislayout = handleResolver.convertLayout(layout); KinesisTableHandle kinesisTableHandle = kinesislayout.getTable(); InternalStreamDescription desc = this.getStreamDescription(kinesisTableHandle.getStreamName()); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (Shard shard : desc.getShards()) { KinesisSplit split = new KinesisSplit(connectorId, kinesisTableHandle.getStreamName(), kinesisTableHandle.getMessageDataFormat(), shard.getShardId(), shard.getSequenceNumberRange().getStartingSequenceNumber(), shard.getSequenceNumberRange().getEndingSequenceNumber()); builder.add(split); } return new FixedSplitSource(builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { RestConnectorTableLayoutHandle layoutHandle = Types.checkType(layout, RestConnectorTableLayoutHandle.class, "layout"); List<HostAddress> addresses = nodeManager.getRequiredWorkerNodes().stream() .map(Node::getHostAndPort) .collect(toList()); return new FixedSplitSource(ImmutableList.of( new RestConnectorSplit(layoutHandle.getTableHandle(), addresses))); }
@Override public ConnectorSplitSource getSplits(JdbcTableLayoutHandle layoutHandle) { JdbcTableHandle tableHandle = layoutHandle.getTable(); JdbcSplit jdbcSplit = new JdbcSplit( connectorId, tableHandle.getCatalogName(), tableHandle.getSchemaName(), tableHandle.getTableName(), connectionUrl, fromProperties(connectionProperties), layoutHandle.getTupleDomain()); return new FixedSplitSource(connectorId, ImmutableList.of(jdbcSplit)); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SystemTableLayoutHandle layoutHandle = checkType(layout, SystemTableLayoutHandle.class, "layout"); SystemTableHandle tableHandle = layoutHandle.getTable(); TupleDomain<ColumnHandle> constraint = layoutHandle.getConstraint(); SystemTable systemTable = tables.get(tableHandle.getSchemaTableName()); Distribution tableDistributionMode = systemTable.getDistribution(); if (tableDistributionMode == SINGLE_COORDINATOR) { HostAddress address = nodeManager.getCurrentNode().getHostAndPort(); ConnectorSplit split = new SystemSplit(tableHandle.getConnectorId(), tableHandle, address, constraint); return new FixedSplitSource(GlobalSystemConnector.NAME, ImmutableList.of(split)); } ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); ImmutableSet.Builder<Node> nodes = ImmutableSet.builder(); if (tableDistributionMode == ALL_COORDINATORS) { nodes.addAll(nodeManager.getCoordinators()); } else if (tableDistributionMode == ALL_NODES) { nodes.addAll(nodeManager.getNodes(ACTIVE)); } Set<Node> nodeSet = nodes.build(); for (Node node : nodeSet) { splits.add(new SystemSplit(tableHandle.getConnectorId(), tableHandle, node.getHostAndPort(), constraint)); } return new FixedSplitSource(GlobalSystemConnector.NAME, splits.build()); }
private static ConnectorSplitSource createFixedSplitSource(int splitCount, Supplier<ConnectorSplit> splitFactory) { ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (int i = 0; i < splitCount; i++) { splits.add(splitFactory.get()); } return new FixedSplitSource(CONNECTOR_ID, splits.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SpreadsheetTableLayoutHandle layoutHandle = (SpreadsheetTableLayoutHandle) layout; SpreadsheetTableHandle spreadsheetTableHandle = layoutHandle.getTable(); SpreadsheetSplit spreadsheetSplit = new SpreadsheetSplit(spreadsheetTableHandle); return new FixedSplitSource(ImmutableList.of(spreadsheetSplit)); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { KafkaTableHandle kafkaTableHandle = convertLayout(layout).getTable(); SimpleConsumer simpleConsumer = consumerManager.getConsumer(selectRandom(nodes)); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(ImmutableList.of(kafkaTableHandle.getTopicName())); TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest); ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) { for (PartitionMetadata part : metadata.partitionsMetadata()) { log.debug("Adding Partition %s/%s", metadata.topic(), part.partitionId()); Broker leader = part.leader(); if (leader == null) { // Leader election going on... log.warn("No leader for partition %s/%s found!", metadata.topic(), part.partitionId()); continue; } HostAddress partitionLeader = HostAddress.fromParts(leader.host(), leader.port()); SimpleConsumer leaderConsumer = consumerManager.getConsumer(partitionLeader); // Kafka contains a reverse list of "end - start" pairs for the splits List<HostAddress> partitionNodes = ImmutableList.copyOf(Lists.transform(part.isr(), KafkaSplitManager::brokerToHostAddress)); long[] offsets = findAllOffsets(leaderConsumer, metadata.topic(), part.partitionId()); for (int i = offsets.length - 1; i > 0; i--) { KafkaSplit split = new KafkaSplit( connectorId, metadata.topic(), kafkaTableHandle.getKeyDataFormat(), kafkaTableHandle.getMessageDataFormat(), part.partitionId(), offsets[i], offsets[i - 1], partitionNodes); splits.add(split); } } } return new FixedSplitSource(connectorId, splits.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) { RedisTableHandle redisTableHandle = convertLayout(layout).getTable(); List<HostAddress> nodes = new ArrayList<>(redisConnectorConfig.getNodes()); Collections.shuffle(nodes); checkState(!nodes.isEmpty(), "No Redis nodes available"); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); long numberOfKeys = 1; // when Redis keys are provides in a zset, create multiple // splits by splitting zset in chunks if (redisTableHandle.getKeyDataFormat().equals("zset")) { try (Jedis jedis = jedisManager.getJedisPool(nodes.get(0)).getResource()) { numberOfKeys = jedis.zcount(redisTableHandle.getKeyName(), "-inf", "+inf"); } catch (Exception e) { throw Throwables.propagate(e); } } long stride = REDIS_STRIDE_SPLITS; if (numberOfKeys / stride > REDIS_MAX_SPLITS) { stride = numberOfKeys / REDIS_MAX_SPLITS; } for (long startIndex = 0; startIndex < numberOfKeys; startIndex += stride) { long endIndex = startIndex + stride - 1; if (endIndex >= numberOfKeys) { endIndex = -1; } RedisSplit split = new RedisSplit(connectorId, redisTableHandle.getSchemaName(), redisTableHandle.getTableName(), redisTableHandle.getKeyDataFormat(), redisTableHandle.getValueDataFormat(), redisTableHandle.getKeyName(), startIndex, endIndex, nodes); builder.add(split); } return new FixedSplitSource(connectorId, builder.build()); }