@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle handle, ConnectorSession session, ConnectorTableLayoutHandle layout) { log.info("INFORMATION: AmpoolSplitManager getSplits() called."); AmpoolTableLayoutHandle layoutHandle = (AmpoolTableLayoutHandle) layout; AmpoolTableHandle tableHandle = layoutHandle.getTable(); AmpoolTable table = new AmpoolTable(ampoolClient, tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table.getColumnsMetadata() != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); // TODO Pass here bucket id splits.add(new AmpoolSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(),"" ,HostAddress.fromParts("localhost",0))); Collections.shuffle(splits); return new FixedSplitSource(splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KuduTableLayoutHandle layoutHandle = checkType(layout, KuduTableLayoutHandle.class, "layout"); KuduTableHandle tableHandle = layoutHandle.getTable(); KuduClient kuduClient = kuduClientManager.getClient(); List<KuduScanToken> tokens = kuduClientManager.newScanTokenBuilder(kuduClient, tableHandle.getSchemaTableName().getTableName()).build(); TupleDomain<KuduColumnHandle> effectivePredicate = layoutHandle.getConstraint() .transform(handle -> checkType(handle, KuduColumnHandle.class, "columnHandle")); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (int i = 0; i < tokens.size(); i++) { // nodeManager.getWorkerNodes() List<HostAddress> hostAddresses = nodeManager.getWorkerNodes().stream() .map(node -> node.getHostAndPort()).collect(Collectors.toList()); ConnectorSplit split = new KuduSplit(hostAddresses, tableHandle.getSchemaTableName(), i, effectivePredicate); builder.add(split); } kuduClientManager.close(kuduClient); return new FixedSplitSource(builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) { ExampleTableLayoutHandle layoutHandle = checkType(layout, ExampleTableLayoutHandle.class, "layout"); ExampleTableHandle tableHandle = layoutHandle.getTable(); ExampleTable table = exampleClient.getTable(tableHandle.getSchemaName(), tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); for (URI uri : table.getSources()) { splits.add(new ExampleSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(), uri)); } Collections.shuffle(splits); return new FixedSplitSource(connectorId, splits); }
@Test public void testSanity() throws InterruptedException { List<ConnectorTableLayoutResult> layouts = metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty()); assertEquals(layouts.size(), 1); ConnectorTableLayoutResult layout = getOnlyElement(layouts); assertInstanceOf(layout.getTableLayout().getHandle(), RaptorTableLayoutHandle.class); ConnectorSplitSource splitSource = getSplits(raptorSplitManager, layout); int splitCount = 0; while (!splitSource.isFinished()) { splitCount += getFutureValue(splitSource.getNextBatch(1000)).size(); } assertEquals(splitCount, 4); }
@Test public void testAssignRandomNodeWhenBackupAvailable() throws InterruptedException, URISyntaxException { InMemoryNodeManager nodeManager = new InMemoryNodeManager(); RaptorConnectorId connectorId = new RaptorConnectorId("raptor"); NodeSupplier nodeSupplier = new RaptorNodeSupplier(nodeManager, connectorId); PrestoNode node = new PrestoNode(UUID.randomUUID().toString(), new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN); nodeManager.addNode(connectorId.toString(), node); RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(connectorId, nodeSupplier, shardManager, true); deleteShardNodes(); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty())); ConnectorSplitSource partitionSplit = getSplits(raptorSplitManagerWithBackup, layout); List<ConnectorSplit> batch = getFutureValue(partitionSplit.getNextBatch(1), PrestoException.class); assertEquals(getOnlyElement(getOnlyElement(batch).getAddresses()), node.getHostAndPort()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layoutHandle) { BlackHoleTableLayoutHandle layout = checkType( layoutHandle, BlackHoleTableLayoutHandle.class, "BlackHoleTableLayoutHandle"); ImmutableList.Builder<BlackHoleSplit> builder = ImmutableList.<BlackHoleSplit>builder(); for (int i = 0; i < layout.getSplitCount(); i++) { builder.add( new BlackHoleSplit( layout.getPagesPerSplit(), layout.getRowsPerPage(), layout.getFieldsLength())); } return new FixedSplitSource("blackhole", builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { JmxTableLayoutHandle jmxLayout = checkType(layout, JmxTableLayoutHandle.class, "layout"); JmxTableHandle tableHandle = jmxLayout.getTable(); TupleDomain<ColumnHandle> predicate = jmxLayout.getConstraint(); //TODO is there a better way to get the node column? JmxColumnHandle nodeColumnHandle = tableHandle.getColumns().get(0); List<ConnectorSplit> splits = nodeManager.getNodes(ACTIVE) .stream() .filter(node -> { NullableValue value = NullableValue.of(VARCHAR, utf8Slice(node.getNodeIdentifier())); return predicate.overlaps(fromFixedValues(ImmutableMap.of(nodeColumnHandle, value))); }) .map(node -> new JmxSplit(tableHandle, ImmutableList.of(node.getHostAndPort()))) .collect(toList()); return new FixedSplitSource(connectorId, splits); }
@Test public void testPredicatePushdown() throws Exception { for (Node node : nodes) { String nodeIdentifier = node.getNodeIdentifier(); TupleDomain<ColumnHandle> nodeTupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(columnHandle, NullableValue.of(VARCHAR, utf8Slice(nodeIdentifier)))); ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, nodeTupleDomain); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), 1); assertEquals(allSplits.get(0).getAddresses().size(), 1); assertEquals(allSplits.get(0).getAddresses().get(0).getHostText(), nodeIdentifier); } }
@Test public void testNoPredicate() throws Exception { ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, TupleDomain.all()); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), nodes.size()); Set<String> actualNodes = nodes.stream().map(Node::getNodeIdentifier).collect(toSet()); Set<String> expectedNodes = new HashSet<>(); for (ConnectorSplit split : allSplits) { List<HostAddress> addresses = split.getAddresses(); assertEquals(addresses.size(), 1); expectedNodes.add(addresses.get(0).getHostText()); } assertEquals(actualNodes, expectedNodes); }
@Test public void testRecordSetProvider() throws Exception { for (SchemaTableName schemaTableName : metadata.listTables(SESSION, "jmx")) { JmxTableHandle tableHandle = metadata.getTableHandle(SESSION, schemaTableName); List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, tableHandle).values()); ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, TupleDomain.all()); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), nodes.size()); ConnectorSplit split = allSplits.get(0); RecordSet recordSet = recordSetProvider.getRecordSet(JmxTransactionHandle.INSTANCE, SESSION, split, columnHandles); try (RecordCursor cursor = recordSet.cursor()) { while (cursor.advanceNextPosition()) { for (int i = 0; i < recordSet.getColumnTypes().size(); i++) { cursor.isNull(i); } } } } }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { InformationSchemaTableLayoutHandle handle = checkType(layout, InformationSchemaTableLayoutHandle.class, "layout"); Map<ColumnHandle, NullableValue> bindings = extractFixedValues(handle.getConstraint()).orElse(ImmutableMap.of()); List<HostAddress> localAddress = ImmutableList.of(nodeManager.getCurrentNode().getHostAndPort()); Map<String, NullableValue> filters = bindings.entrySet().stream().collect(toMap( entry -> checkType(entry.getKey(), InformationSchemaColumnHandle.class, "column").getColumnName(), Entry::getValue)); ConnectorSplit split = new InformationSchemaSplit(handle.getTable(), filters, localAddress); return new FixedSplitSource(null, ImmutableList.of(split)); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { TpchTableHandle tableHandle = checkType(layout, TpchTableLayoutHandle.class, "layout").getTable(); Set<Node> nodes = nodeManager.getActiveDatasourceNodes(connectorId); checkState(!nodes.isEmpty(), "No TPCH nodes available"); int totalParts = nodes.size() * splitsPerNode; int partNumber = 0; // Split the data using split and skew by the number of nodes available. ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (Node node : nodes) { for (int i = 0; i < splitsPerNode; i++) { splits.add(new TpchSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()))); partNumber++; } } return new FixedSplitSource(connectorId, splits.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { CassandraTableLayoutHandle layoutHandle = checkType(layout, CassandraTableLayoutHandle.class, "layout"); CassandraTableHandle cassandraTableHandle = layoutHandle.getTable(); List<CassandraPartition> partitions = layoutHandle.getPartitions().get(); requireNonNull(partitions, "partitions is null"); if (partitions.isEmpty()) { return new FixedSplitSource(connectorId, ImmutableList.<ConnectorSplit>of()); } // if this is an unpartitioned table, split into equal ranges if (partitions.size() == 1) { CassandraPartition cassandraPartition = partitions.get(0); if (cassandraPartition.isUnpartitioned() || cassandraPartition.isIndexedColumnPredicatePushdown()) { CassandraTable table = schemaProvider.getTable(cassandraTableHandle); List<ConnectorSplit> splits = getSplitsByTokenRange(table, cassandraPartition.getPartitionId()); return new FixedSplitSource(connectorId, splits); } } return new FixedSplitSource(connectorId, getSplitsForPartitions(cassandraTableHandle, partitions)); }
@Test public void testGetRecordsS3() throws Exception { ConnectorTableHandle table = getTableHandle(tableS3); List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, table).values()); Map<String, Integer> columnIndex = indexColumns(columnHandles); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(SESSION, table, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); assertEquals(layoutHandle.getPartitions().get().size(), 1); ConnectorSplitSource splitSource = splitManager.getSplits(SESSION, layoutHandle); long sum = 0; for (ConnectorSplit split : getAllSplits(splitSource)) { try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(SESSION, split, columnHandles)) { MaterializedResult result = materializeSourceDataStream(SESSION, pageSource, getTypes(columnHandles)); for (MaterializedRow row : result) { sum += (Long) row.getField(columnIndex.get("t_bigint")); } } } assertEquals(sum, 78300); }
@Test public void testPartitionSchemaNonCanonical() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle table = getTableHandle(tablePartitionSchemaChangeNonCanonical); ColumnHandle column = metadata.getColumnHandles(session, table).get("t_boolean"); assertNotNull(column); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, new Constraint<>(TupleDomain.fromFixedValues(ImmutableMap.of(column, NullableValue.of(BOOLEAN, false))), bindings -> true), Optional.empty()); ConnectorTableLayoutHandle layoutHandle = getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); assertEquals(getAllPartitions(layoutHandle).size(), 1); assertEquals(getPartitionId(getAllPartitions(layoutHandle).get(0)), "t_boolean=0"); ConnectorSplitSource splitSource = splitManager.getSplits(session, layoutHandle); ConnectorSplit split = getOnlyElement(getAllSplits(splitSource)); ImmutableList<ColumnHandle> columnHandles = ImmutableList.of(column); try (ConnectorPageSource ignored = pageSourceProvider.createPageSource(session, split, columnHandles)) { // TODO coercion of non-canonical values should be supported fail("expected exception"); } catch (PrestoException e) { assertEquals(e.getErrorCode(), HIVE_INVALID_PARTITION_VALUE.toErrorCode()); } }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KinesisTableLayoutHandle kinesislayout = handleResolver.convertLayout(layout); KinesisTableHandle kinesisTableHandle = kinesislayout.getTable(); InternalStreamDescription desc = this.getStreamDescription(kinesisTableHandle.getStreamName()); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (Shard shard : desc.getShards()) { KinesisSplit split = new KinesisSplit(connectorId, kinesisTableHandle.getStreamName(), kinesisTableHandle.getMessageDataFormat(), shard.getShardId(), shard.getSequenceNumberRange().getStartingSequenceNumber(), shard.getSequenceNumberRange().getEndingSequenceNumber()); builder.add(split); } return new FixedSplitSource(builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { RestConnectorTableLayoutHandle layoutHandle = Types.checkType(layout, RestConnectorTableLayoutHandle.class, "layout"); List<HostAddress> addresses = nodeManager.getRequiredWorkerNodes().stream() .map(Node::getHostAndPort) .collect(toList()); return new FixedSplitSource(ImmutableList.of( new RestConnectorSplit(layoutHandle.getTableHandle(), addresses))); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { RaptorTableLayoutHandle handle = checkType(layout, RaptorTableLayoutHandle.class, "layout"); RaptorTableHandle table = handle.getTable(); TupleDomain<RaptorColumnHandle> effectivePredicate = toRaptorTupleDomain(handle.getConstraint()); return new RaptorSplitSource(table.getTableId(), effectivePredicate, table.getTransactionId()); }
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "No host for shard .* found: \\[\\]") public void testNoHostForShard() throws InterruptedException { deleteShardNodes(); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty())); ConnectorSplitSource splitSource = getSplits(raptorSplitManager, layout); getFutureValue(splitSource.getNextBatch(1000)); }
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "No nodes available to run query") public void testNoNodes() throws InterruptedException, URISyntaxException { deleteShardNodes(); RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), ImmutableSet::of, shardManager, true); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty())); ConnectorSplitSource splitSource = getSplits(raptorSplitManagerWithBackup, layout); getFutureValue(splitSource.getNextBatch(1000), PrestoException.class); }
private static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSource) throws InterruptedException, ExecutionException { ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); while (!splitSource.isFinished()) { List<ConnectorSplit> batch = splitSource.getNextBatch(1000).get(); splits.addAll(batch); } return splits.build(); }
@Override public ConnectorSplitSource getSplits(JdbcTableLayoutHandle layoutHandle) { JdbcTableHandle tableHandle = layoutHandle.getTable(); JdbcSplit jdbcSplit = new JdbcSplit( connectorId, tableHandle.getCatalogName(), tableHandle.getSchemaName(), tableHandle.getTableName(), connectionUrl, fromProperties(connectionProperties), layoutHandle.getTupleDomain()); return new FixedSplitSource(connectorId, ImmutableList.of(jdbcSplit)); }
public JdbcSplit getSplit(String schemaName, String tableName) throws InterruptedException { JdbcTableHandle jdbcTableHandle = jdbcClient.getTableHandle(new SchemaTableName(schemaName, tableName)); JdbcTableLayoutHandle jdbcLayoutHandle = new JdbcTableLayoutHandle(jdbcTableHandle, TupleDomain.<ColumnHandle>all()); ConnectorSplitSource splits = jdbcClient.getSplits(jdbcLayoutHandle); return (JdbcSplit) getOnlyElement(getFutureValue(splits.getNextBatch(1000))); }
private RecordCursor getCursor(JdbcTableHandle jdbcTableHandle, List<JdbcColumnHandle> columns, TupleDomain<ColumnHandle> domain) throws InterruptedException { JdbcTableLayoutHandle layoutHandle = new JdbcTableLayoutHandle(jdbcTableHandle, domain); ConnectorSplitSource splits = jdbcClient.getSplits(layoutHandle); JdbcSplit split = (JdbcSplit) getOnlyElement(getFutureValue(splits.getNextBatch(1000))); JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient); RecordSet recordSet = recordSetProvider.getRecordSet(SESSION, split, columns); return recordSet.cursor(); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SystemTableLayoutHandle layoutHandle = checkType(layout, SystemTableLayoutHandle.class, "layout"); SystemTableHandle tableHandle = layoutHandle.getTable(); TupleDomain<ColumnHandle> constraint = layoutHandle.getConstraint(); SystemTable systemTable = tables.get(tableHandle.getSchemaTableName()); Distribution tableDistributionMode = systemTable.getDistribution(); if (tableDistributionMode == SINGLE_COORDINATOR) { HostAddress address = nodeManager.getCurrentNode().getHostAndPort(); ConnectorSplit split = new SystemSplit(tableHandle.getConnectorId(), tableHandle, address, constraint); return new FixedSplitSource(GlobalSystemConnector.NAME, ImmutableList.of(split)); } ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); ImmutableSet.Builder<Node> nodes = ImmutableSet.builder(); if (tableDistributionMode == ALL_COORDINATORS) { nodes.addAll(nodeManager.getCoordinators()); } else if (tableDistributionMode == ALL_NODES) { nodes.addAll(nodeManager.getNodes(ACTIVE)); } Set<Node> nodeSet = nodes.build(); for (Node node : nodeSet) { splits.add(new SystemSplit(tableHandle.getConnectorId(), tableHandle, node.getHostAndPort(), constraint)); } return new FixedSplitSource(GlobalSystemConnector.NAME, splits.build()); }
public SplitSource getSplits(Session session, TableLayoutHandle layout) { String connectorId = layout.getConnectorId(); ConnectorSplitManager splitManager = getConnectorSplitManager(connectorId); // assumes connectorId and catalog are the same ConnectorSession connectorSession = session.toConnectorSession(connectorId); ConnectorSplitSource source = splitManager.getSplits(layout.getTransactionHandle(), connectorSession, layout.getConnectorHandle()); return new ConnectorAwareSplitSource(connectorId, layout.getTransactionHandle(), source); }
private static StageExecutionPlan createPlan(ConnectorSplitSource splitSource) { Symbol symbol = new Symbol("column"); // table scan with splitCount splits PlanNodeId tableScanNodeId = new PlanNodeId("plan_id"); PlanFragment testFragment = new PlanFragment( new PlanFragmentId("plan_id"), new JoinNode(new PlanNodeId("join_id"), INNER, new TableScanNode( tableScanNodeId, new TableHandle(CONNECTOR_ID, new TestingTableHandle()), ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingColumnHandle("column")), Optional.empty(), TupleDomain.all(), null), new RemoteSourceNode(new PlanNodeId("remote_id"), new PlanFragmentId("plan_fragment_id"), ImmutableList.of()), ImmutableList.of(), Optional.<Symbol>empty(), Optional.<Symbol>empty()), ImmutableMap.<Symbol, Type>of(symbol, VARCHAR), ImmutableList.of(symbol), SOURCE, tableScanNodeId, Optional.empty()); return new StageExecutionPlan(testFragment, Optional.of(new ConnectorAwareSplitSource(CONNECTOR_ID, TestingTransactionHandle.create(CONNECTOR_ID), splitSource)), ImmutableList.<StageExecutionPlan>of()); }
private static ConnectorSplitSource createFixedSplitSource(int splitCount, Supplier<ConnectorSplit> splitFactory) { ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (int i = 0; i < splitCount; i++) { splits.add(splitFactory.get()); } return new FixedSplitSource(CONNECTOR_ID, splits.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { return delegate.getSplits(session, layout); } }
private static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSource) throws InterruptedException { ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); while (!splitSource.isFinished()) { splits.addAll(getFutureValue(splitSource.getNextBatch(1000))); } return splits.build(); }
private static List<ConnectorSplit> getAllSplits(ConnectorSplitSource source) throws InterruptedException { ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); while (!source.isFinished()) { splits.addAll(getFutureValue(source.getNextBatch(1000))); } return splits.build(); }
@Test public void testGetPartitionSplitsBatch() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle tableHandle = getTableHandle(tablePartitionFormat); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); ConnectorSplitSource splitSource = splitManager.getSplits(session, getOnlyElement(tableLayoutResults).getTableLayout().getHandle()); assertEquals(getSplitCount(splitSource), partitionCount); }
@Test public void testGetPartitionSplitsBatchUnpartitioned() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle tableHandle = getTableHandle(tableUnpartitioned); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); ConnectorSplitSource splitSource = splitManager.getSplits(session, getOnlyElement(tableLayoutResults).getTableLayout().getHandle()); assertEquals(getSplitCount(splitSource), 1); }
@Test public void testGetPartitionSplitsEmpty() throws Exception { ConnectorSplitSource splitSource = splitManager.getSplits(newSession(), emptyTableLayoutHandle); // fetch full list getSplitCount(splitSource); }
protected static int getSplitCount(ConnectorSplitSource splitSource) throws InterruptedException { int splitCount = 0; while (!splitSource.isFinished()) { List<ConnectorSplit> batch = getFutureValue(splitSource.getNextBatch(1000)); splitCount += batch.size(); } return splitCount; }
protected static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSource) throws InterruptedException { ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); while (!splitSource.isFinished()) { List<ConnectorSplit> batch = getFutureValue(splitSource.getNextBatch(1000)); splits.addAll(batch); } return splits.build(); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SpreadsheetTableLayoutHandle layoutHandle = (SpreadsheetTableLayoutHandle) layout; SpreadsheetTableHandle spreadsheetTableHandle = layoutHandle.getTable(); SpreadsheetSplit spreadsheetSplit = new SpreadsheetSplit(spreadsheetTableHandle); return new FixedSplitSource(ImmutableList.of(spreadsheetSplit)); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { KafkaTableHandle kafkaTableHandle = convertLayout(layout).getTable(); SimpleConsumer simpleConsumer = consumerManager.getConsumer(selectRandom(nodes)); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(ImmutableList.of(kafkaTableHandle.getTopicName())); TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest); ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) { for (PartitionMetadata part : metadata.partitionsMetadata()) { log.debug("Adding Partition %s/%s", metadata.topic(), part.partitionId()); Broker leader = part.leader(); if (leader == null) { // Leader election going on... log.warn("No leader for partition %s/%s found!", metadata.topic(), part.partitionId()); continue; } HostAddress partitionLeader = HostAddress.fromParts(leader.host(), leader.port()); SimpleConsumer leaderConsumer = consumerManager.getConsumer(partitionLeader); // Kafka contains a reverse list of "end - start" pairs for the splits List<HostAddress> partitionNodes = ImmutableList.copyOf(Lists.transform(part.isr(), KafkaSplitManager::brokerToHostAddress)); long[] offsets = findAllOffsets(leaderConsumer, metadata.topic(), part.partitionId()); for (int i = offsets.length - 1; i > 0; i--) { KafkaSplit split = new KafkaSplit( connectorId, metadata.topic(), kafkaTableHandle.getKeyDataFormat(), kafkaTableHandle.getMessageDataFormat(), part.partitionId(), offsets[i], offsets[i - 1], partitionNodes); splits.add(split); } } } return new FixedSplitSource(connectorId, splits.build()); }
private static ConnectorSplitSource getSplits(RaptorSplitManager splitManager, ConnectorTableLayoutResult layout) { ConnectorTransactionHandle transaction = new RaptorTransactionHandle(); return splitManager.getSplits(transaction, SESSION, layout.getTableLayout().getHandle()); }
@Override public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) { RedisTableHandle redisTableHandle = convertLayout(layout).getTable(); List<HostAddress> nodes = new ArrayList<>(redisConnectorConfig.getNodes()); Collections.shuffle(nodes); checkState(!nodes.isEmpty(), "No Redis nodes available"); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); long numberOfKeys = 1; // when Redis keys are provides in a zset, create multiple // splits by splitting zset in chunks if (redisTableHandle.getKeyDataFormat().equals("zset")) { try (Jedis jedis = jedisManager.getJedisPool(nodes.get(0)).getResource()) { numberOfKeys = jedis.zcount(redisTableHandle.getKeyName(), "-inf", "+inf"); } catch (Exception e) { throw Throwables.propagate(e); } } long stride = REDIS_STRIDE_SPLITS; if (numberOfKeys / stride > REDIS_MAX_SPLITS) { stride = numberOfKeys / REDIS_MAX_SPLITS; } for (long startIndex = 0; startIndex < numberOfKeys; startIndex += stride) { long endIndex = startIndex + stride - 1; if (endIndex >= numberOfKeys) { endIndex = -1; } RedisSplit split = new RedisSplit(connectorId, redisTableHandle.getSchemaName(), redisTableHandle.getTableName(), redisTableHandle.getKeyDataFormat(), redisTableHandle.getValueDataFormat(), redisTableHandle.getKeyName(), startIndex, endIndex, nodes); builder.add(split); } return new FixedSplitSource(connectorId, builder.build()); }