@Override public RecordSet getRecordSet( ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns ) { EthereumSplit ethereumSplit = convertSplit(split); ImmutableList.Builder<EthereumColumnHandle> handleBuilder = ImmutableList.builder(); for (ColumnHandle handle : columns) { EthereumColumnHandle columnHandle = convertColumnHandle(handle); handleBuilder.add(columnHandle); } return new EthereumRecordSet(web3j, handleBuilder.build(), ethereumSplit); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { List<HDFSColumnHandle> hdfsColumns = columns.stream() .map(col -> (HDFSColumnHandle) col) .collect(Collectors.toList()); HDFSSplit hdfsSplit = checkType(split, HDFSSplit.class, "hdfs split"); Path path = new Path(hdfsSplit.getPath()); Optional<ConnectorPageSource> pageSource = createHDFSPageSource( path, hdfsSplit.getStart(), hdfsSplit.getLen(), hdfsColumns); if (pageSource.isPresent()) { return pageSource.get(); } throw new RuntimeException("Could not find a file reader for split " + hdfsSplit); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle handle, ConnectorSession session, ConnectorTableLayoutHandle layout) { log.info("INFORMATION: AmpoolSplitManager getSplits() called."); AmpoolTableLayoutHandle layoutHandle = (AmpoolTableLayoutHandle) layout; AmpoolTableHandle tableHandle = layoutHandle.getTable(); AmpoolTable table = new AmpoolTable(ampoolClient, tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table.getColumnsMetadata() != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); // TODO Pass here bucket id splits.add(new AmpoolSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(),"" ,HostAddress.fromParts("localhost",0))); Collections.shuffle(splits); return new FixedSplitSource(splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KuduTableLayoutHandle layoutHandle = checkType(layout, KuduTableLayoutHandle.class, "layout"); KuduTableHandle tableHandle = layoutHandle.getTable(); KuduClient kuduClient = kuduClientManager.getClient(); List<KuduScanToken> tokens = kuduClientManager.newScanTokenBuilder(kuduClient, tableHandle.getSchemaTableName().getTableName()).build(); TupleDomain<KuduColumnHandle> effectivePredicate = layoutHandle.getConstraint() .transform(handle -> checkType(handle, KuduColumnHandle.class, "columnHandle")); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (int i = 0; i < tokens.size(); i++) { // nodeManager.getWorkerNodes() List<HostAddress> hostAddresses = nodeManager.getWorkerNodes().stream() .map(node -> node.getHostAndPort()).collect(Collectors.toList()); ConnectorSplit split = new KuduSplit(hostAddresses, tableHandle.getSchemaTableName(), i, effectivePredicate); builder.add(split); } kuduClientManager.close(kuduClient); return new FixedSplitSource(builder.build()); }
@Override /** * @ */ public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { requireNonNull(split, "split is null"); KuduSplit kuduSplit = checkType(split, KuduSplit.class, "split"); ImmutableList.Builder<KuduColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : columns) { handles.add(checkType(handle, KuduColumnHandle.class, "handle")); } return new KuduRecordSet(kuduTable, kuduClientManager, kuduSplit, handles.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) { ExampleTableLayoutHandle layoutHandle = checkType(layout, ExampleTableLayoutHandle.class, "layout"); ExampleTableHandle tableHandle = layoutHandle.getTable(); ExampleTable table = exampleClient.getTable(tableHandle.getSchemaName(), tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); for (URI uri : table.getSources()) { splits.add(new ExampleSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(), uri)); } Collections.shuffle(splits); return new FixedSplitSource(connectorId, splits); }
private Supplier<List<ConnectorSplit>> batchSupplier(int maxSize) { return () -> { ImmutableList.Builder<ConnectorSplit> list = ImmutableList.builder(); for (int i = 0; i < maxSize; i++) { if (Thread.currentThread().isInterrupted()) { throw new RuntimeException("Split batch fetch was interrupted"); } if (!iterator.hasNext()) { break; } list.add(createSplit(iterator.next())); } return list.build(); }; }
private ConnectorSplit createSplit(ShardNodes shard) { UUID shardId = shard.getShardUuid(); Collection<String> nodeIds = shard.getNodeIdentifiers(); List<HostAddress> addresses = getAddressesForNodes(nodesById, nodeIds); if (addresses.isEmpty()) { if (!backupAvailable) { throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds)); } // Pick a random node and optimistically assign the shard to it. // That node will restore the shard from the backup location. Set<Node> availableNodes = nodeSupplier.getWorkerNodes(); if (availableNodes.isEmpty()) { throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query"); } Node node = selectRandom(availableNodes); shardManager.assignShard(tableId, shardId, node.getNodeIdentifier()); addresses = ImmutableList.of(node.getHostAndPort()); } return new RaptorSplit(connectorId, shardId, addresses, effectivePredicate, transactionId); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { RaptorSplit raptorSplit = checkType(split, RaptorSplit.class, "split"); UUID shardUuid = raptorSplit.getShardUuid(); List<RaptorColumnHandle> columnHandles = columns.stream().map(toRaptorColumnHandle()).collect(toList()); List<Long> columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList()); List<Type> columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList()); return storageManager.getPageSource( shardUuid, columnIds, columnTypes, raptorSplit.getEffectivePredicate(), ReaderAttributes.from(session), raptorSplit.getTransactionId()); }
@Test public void testAssignRandomNodeWhenBackupAvailable() throws InterruptedException, URISyntaxException { InMemoryNodeManager nodeManager = new InMemoryNodeManager(); RaptorConnectorId connectorId = new RaptorConnectorId("raptor"); NodeSupplier nodeSupplier = new RaptorNodeSupplier(nodeManager, connectorId); PrestoNode node = new PrestoNode(UUID.randomUUID().toString(), new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN); nodeManager.addNode(connectorId.toString(), node); RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(connectorId, nodeSupplier, shardManager, true); deleteShardNodes(); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty())); ConnectorSplitSource partitionSplit = getSplits(raptorSplitManagerWithBackup, layout); List<ConnectorSplit> batch = getFutureValue(partitionSplit.getNextBatch(1), PrestoException.class); assertEquals(getOnlyElement(getOnlyElement(batch).getAddresses()), node.getHostAndPort()); }
@Override public ConnectorPageSource createPageSource( ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { BlackHoleSplit blackHoleSplit = checkType(split, BlackHoleSplit.class, "BlackHoleSplit"); ImmutableList.Builder<Type> builder = ImmutableList.builder(); for (ColumnHandle column : columns) { builder.add((checkType(column, BlackHoleColumnHandle.class, "BlackHoleColumnHandle")).getColumnType()); } List<Type> types = builder.build(); return new FixedPageSource(Iterables.limit( Iterables.cycle(generateZeroPage(types, blackHoleSplit.getRowsPerPage(), blackHoleSplit.getFieldsLength())), blackHoleSplit.getPagesCount())); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { JmxTableLayoutHandle jmxLayout = checkType(layout, JmxTableLayoutHandle.class, "layout"); JmxTableHandle tableHandle = jmxLayout.getTable(); TupleDomain<ColumnHandle> predicate = jmxLayout.getConstraint(); //TODO is there a better way to get the node column? JmxColumnHandle nodeColumnHandle = tableHandle.getColumns().get(0); List<ConnectorSplit> splits = nodeManager.getNodes(ACTIVE) .stream() .filter(node -> { NullableValue value = NullableValue.of(VARCHAR, utf8Slice(node.getNodeIdentifier())); return predicate.overlaps(fromFixedValues(ImmutableMap.of(nodeColumnHandle, value))); }) .map(node -> new JmxSplit(tableHandle, ImmutableList.of(node.getHostAndPort()))) .collect(toList()); return new FixedSplitSource(connectorId, splits); }
@Test public void testPredicatePushdown() throws Exception { for (Node node : nodes) { String nodeIdentifier = node.getNodeIdentifier(); TupleDomain<ColumnHandle> nodeTupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(columnHandle, NullableValue.of(VARCHAR, utf8Slice(nodeIdentifier)))); ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, nodeTupleDomain); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), 1); assertEquals(allSplits.get(0).getAddresses().size(), 1); assertEquals(allSplits.get(0).getAddresses().get(0).getHostText(), nodeIdentifier); } }
@Test public void testNoPredicate() throws Exception { ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, TupleDomain.all()); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), nodes.size()); Set<String> actualNodes = nodes.stream().map(Node::getNodeIdentifier).collect(toSet()); Set<String> expectedNodes = new HashSet<>(); for (ConnectorSplit split : allSplits) { List<HostAddress> addresses = split.getAddresses(); assertEquals(addresses.size(), 1); expectedNodes.add(addresses.get(0).getHostText()); } assertEquals(actualNodes, expectedNodes); }
@Test public void testRecordSetProvider() throws Exception { for (SchemaTableName schemaTableName : metadata.listTables(SESSION, "jmx")) { JmxTableHandle tableHandle = metadata.getTableHandle(SESSION, schemaTableName); List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, tableHandle).values()); ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, TupleDomain.all()); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), nodes.size()); ConnectorSplit split = allSplits.get(0); RecordSet recordSet = recordSetProvider.getRecordSet(JmxTransactionHandle.INSTANCE, SESSION, split, columnHandles); try (RecordCursor cursor = recordSet.cursor()) { while (cursor.advanceNextPosition()) { for (int i = 0; i < recordSet.getColumnTypes().size(); i++) { cursor.isNull(i); } } } } }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { InformationSchemaTableLayoutHandle handle = checkType(layout, InformationSchemaTableLayoutHandle.class, "layout"); Map<ColumnHandle, NullableValue> bindings = extractFixedValues(handle.getConstraint()).orElse(ImmutableMap.of()); List<HostAddress> localAddress = ImmutableList.of(nodeManager.getCurrentNode().getHostAndPort()); Map<String, NullableValue> filters = bindings.entrySet().stream().collect(toMap( entry -> checkType(entry.getKey(), InformationSchemaColumnHandle.class, "column").getColumnName(), Entry::getValue)); ConnectorSplit split = new InformationSchemaSplit(handle.getTable(), filters, localAddress); return new FixedSplitSource(null, ImmutableList.of(split)); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { InternalTable table = getInternalTable(transactionHandle, session, split, columns); List<Integer> channels = new ArrayList<>(); for (ColumnHandle column : columns) { String columnName = checkType(column, InformationSchemaColumnHandle.class, "column").getColumnName(); int columnIndex = table.getColumnIndex(columnName); channels.add(columnIndex); } ImmutableList.Builder<Page> pages = ImmutableList.builder(); for (Page page : table.getPages()) { Block[] blocks = new Block[channels.size()]; for (int index = 0; index < blocks.length; index++) { blocks[index] = page.getBlock(channels.get(index)); } pages.add(new Page(page.getPositionCount(), blocks)); } return new FixedPageSource(pages.build()); }
private InternalTable getInternalTable(ConnectorTransactionHandle transactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<ColumnHandle> columns) { InformationSchemaTransactionHandle transaction = checkType(transactionHandle, InformationSchemaTransactionHandle.class, "transaction"); InformationSchemaSplit split = checkType(connectorSplit, InformationSchemaSplit.class, "split"); requireNonNull(columns, "columns is null"); InformationSchemaTableHandle handle = split.getTableHandle(); Map<String, NullableValue> filters = split.getFilters(); Session session = Session.builder(metadata.getSessionPropertyManager()) .setTransactionId(transaction.getTransactionId()) .setQueryId(new QueryId(connectorSession.getQueryId())) .setIdentity(connectorSession.getIdentity()) .setSource("information_schema") .setCatalog("") // default catalog is not be used .setSchema("") // default schema is not be used .setTimeZoneKey(connectorSession.getTimeZoneKey()) .setLocale(connectorSession.getLocale()) .setStartTime(connectorSession.getStartTime()) .build(); return getInformationSchemaTable(session, handle.getCatalogName(), handle.getSchemaTableName(), filters); }
private synchronized List<ConnectorSplit> getBatch(int maxSize) { // take up to maxSize elements from the queue List<ConnectorSplit> elements = new ArrayList<>(maxSize); queue.drainTo(elements, maxSize); // if the queue is empty and the current future is finished, create a new one so // a new readers can be notified when the queue has elements to read if (queue.isEmpty() && !closed) { if (notEmptyFuture.isDone()) { notEmptyFuture = new CompletableFuture<>(); } } return ImmutableList.copyOf(elements); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { TpchTableHandle tableHandle = checkType(layout, TpchTableLayoutHandle.class, "layout").getTable(); Set<Node> nodes = nodeManager.getActiveDatasourceNodes(connectorId); checkState(!nodes.isEmpty(), "No TPCH nodes available"); int totalParts = nodes.size() * splitsPerNode; int partNumber = 0; // Split the data using split and skew by the number of nodes available. ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (Node node : nodes) { for (int i = 0; i < splitsPerNode; i++) { splits.add(new TpchSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()))); partNumber++; } } return new FixedSplitSource(connectorId, splits.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { CassandraTableLayoutHandle layoutHandle = checkType(layout, CassandraTableLayoutHandle.class, "layout"); CassandraTableHandle cassandraTableHandle = layoutHandle.getTable(); List<CassandraPartition> partitions = layoutHandle.getPartitions().get(); requireNonNull(partitions, "partitions is null"); if (partitions.isEmpty()) { return new FixedSplitSource(connectorId, ImmutableList.<ConnectorSplit>of()); } // if this is an unpartitioned table, split into equal ranges if (partitions.size() == 1) { CassandraPartition cassandraPartition = partitions.get(0); if (cassandraPartition.isUnpartitioned() || cassandraPartition.isIndexedColumnPredicatePushdown()) { CassandraTable table = schemaProvider.getTable(cassandraTableHandle); List<ConnectorSplit> splits = getSplitsByTokenRange(table, cassandraPartition.getPartitionId()); return new FixedSplitSource(connectorId, splits); } } return new FixedSplitSource(connectorId, getSplitsForPartitions(cassandraTableHandle, partitions)); }
private List<ConnectorSplit> getSplitsByTokenRange(CassandraTable table, String partitionId) { String schema = table.getTableHandle().getSchemaName(); String tableName = table.getTableHandle().getTableName(); String tokenExpression = table.getTokenExpression(); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); List<CassandraTokenSplitManager.TokenSplit> tokenSplits; try { tokenSplits = tokenSplitMgr.getSplits(schema, tableName); } catch (IOException e) { throw new RuntimeException(e); } for (CassandraTokenSplitManager.TokenSplit tokenSplit : tokenSplits) { String condition = buildTokenCondition(tokenExpression, tokenSplit.getStartToken(), tokenSplit.getEndToken()); List<HostAddress> addresses = new HostAddressFactory().AddressNamesToHostAddressList(tokenSplit.getHosts()); CassandraSplit split = new CassandraSplit(connectorId, schema, tableName, partitionId, condition, addresses); builder.add(split); } return builder.build(); }
@Override public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { CassandraSplit cassandraSplit = checkType(split, CassandraSplit.class, "split"); List<CassandraColumnHandle> cassandraColumns = columns.stream() .map(column -> checkType(column, CassandraColumnHandle.class, "columnHandle")) .collect(toList()); String selectCql = CassandraCqlUtils.selectFrom(cassandraSplit.getCassandraTableHandle(), cassandraColumns).getQueryString(); StringBuilder sb = new StringBuilder(selectCql); if (sb.charAt(sb.length() - 1) == ';') { sb.setLength(sb.length() - 1); } sb.append(cassandraSplit.getWhereClause()); String cql = sb.toString(); log.debug("Creating record set: %s", cql); return new CassandraRecordSet(cassandraSession, cassandraSplit.getSchema(), cql, cassandraColumns); }
@Override public CompletableFuture<List<ConnectorSplit>> getNextBatch(int maxSize) { checkState(!closed, "Provider is already closed"); CompletableFuture<List<ConnectorSplit>> future = queue.getBatchAsync(maxSize); // Before returning, check if there is a registered failure. // If so, we want to throw the error, instead of returning because the scheduler can block // while scheduling splits and wait for work to finish before continuing. In this case, // we want to end the query as soon as possible and abort the work if (throwable.get() != null) { return failedFuture(throwable.get()); } return future; }
@Test public void testGetRecordsS3() throws Exception { ConnectorTableHandle table = getTableHandle(tableS3); List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, table).values()); Map<String, Integer> columnIndex = indexColumns(columnHandles); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(SESSION, table, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); assertEquals(layoutHandle.getPartitions().get().size(), 1); ConnectorSplitSource splitSource = splitManager.getSplits(SESSION, layoutHandle); long sum = 0; for (ConnectorSplit split : getAllSplits(splitSource)) { try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(SESSION, split, columnHandles)) { MaterializedResult result = materializeSourceDataStream(SESSION, pageSource, getTypes(columnHandles)); for (MaterializedRow row : result) { sum += (Long) row.getField(columnIndex.get("t_bigint")); } } } assertEquals(sum, 78300); }
@Test public void testPartitionSchemaNonCanonical() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle table = getTableHandle(tablePartitionSchemaChangeNonCanonical); ColumnHandle column = metadata.getColumnHandles(session, table).get("t_boolean"); assertNotNull(column); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, new Constraint<>(TupleDomain.fromFixedValues(ImmutableMap.of(column, NullableValue.of(BOOLEAN, false))), bindings -> true), Optional.empty()); ConnectorTableLayoutHandle layoutHandle = getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); assertEquals(getAllPartitions(layoutHandle).size(), 1); assertEquals(getPartitionId(getAllPartitions(layoutHandle).get(0)), "t_boolean=0"); ConnectorSplitSource splitSource = splitManager.getSplits(session, layoutHandle); ConnectorSplit split = getOnlyElement(getAllSplits(splitSource)); ImmutableList<ColumnHandle> columnHandles = ImmutableList.of(column); try (ConnectorPageSource ignored = pageSourceProvider.createPageSource(session, split, columnHandles)) { // TODO coercion of non-canonical values should be supported fail("expected exception"); } catch (PrestoException e) { assertEquals(e.getErrorCode(), HIVE_INVALID_PARTITION_VALUE.toErrorCode()); } }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KinesisTableLayoutHandle kinesislayout = handleResolver.convertLayout(layout); KinesisTableHandle kinesisTableHandle = kinesislayout.getTable(); InternalStreamDescription desc = this.getStreamDescription(kinesisTableHandle.getStreamName()); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (Shard shard : desc.getShards()) { KinesisSplit split = new KinesisSplit(connectorId, kinesisTableHandle.getStreamName(), kinesisTableHandle.getMessageDataFormat(), shard.getShardId(), shard.getSequenceNumberRange().getStartingSequenceNumber(), shard.getSequenceNumberRange().getEndingSequenceNumber()); builder.add(split); } return new FixedSplitSource(builder.build()); }
@Override public RecordSet getRecordSet( ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<? extends ColumnHandle> list) { RestConnectorSplit split = Types.checkType(connectorSplit, RestConnectorSplit.class, "split"); // TODO fix below cast List<RestColumnHandle> restColumnHandles = (List<RestColumnHandle>) list; SchemaTableName schemaTableName = split.getTableHandle().getSchemaTableName(); Collection<? extends List<?>> rows = rest.getRows(schemaTableName); ConnectorTableMetadata tableMetadata = rest.getTableMetadata(schemaTableName); List<Integer> columnIndexes = restColumnHandles.stream() .map(column -> { int index = 0; for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) { if (columnMetadata.getName().equalsIgnoreCase(column.getName())) { return index; } index++; } throw new IllegalStateException("Unknown column: " + column.getName()); }) .collect(toList()); Collection<? extends List<?>> mappedRows = rows.stream() .map(row -> columnIndexes.stream() .map(index -> row.get(index)) .collect(toList())) .collect(toList()); List<Type> mappedTypes = restColumnHandles.stream() .map(RestColumnHandle::getType) .collect(toList()); return new InMemoryRecordSet(mappedTypes, mappedRows); }
@Override public ConnectorHandleResolver getHandleResolver() { return new ConnectorHandleResolver() { public Class<? extends ConnectorTableHandle> getTableHandleClass() { return RestTableHandle.class; } public Class<? extends ColumnHandle> getColumnHandleClass() { return RestColumnHandle.class; } public Class<? extends ConnectorSplit> getSplitClass() { return RestConnectorSplit.class; } public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() { return RestConnectorTableLayoutHandle.class; } @Override public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() { return RestTransactionHandle.class; } @Override public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() { return RestInsertTableHandle.class; } }; }
@Override public RecordSet getRecordSet(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<? extends ColumnHandle> list) { log.info("INFORMATION: AmpoolRecordSetProvider getRecordSet() called."); requireNonNull(connectorSplit, "split is null"); AmpoolSplit ampoolSplit = (AmpoolSplit) connectorSplit; checkArgument(ampoolSplit.getConnectorId().equals(connectorId), "split is not for this connector"); ImmutableList.Builder<AmpoolColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : list) { handles.add((AmpoolColumnHandle) handle); } // TODO: Projections and filters on Ampool side Iterator<Row> iterator; if (ampoolClient.existsFTable(ampoolSplit.getTableName())) iterator = ampoolClient.getFTable(ampoolSplit.getTableName()).getScanner(new Scan()).iterator(); else if (ampoolClient.existsMTable(ampoolSplit.getTableName())) iterator = ampoolClient.getMTable(ampoolSplit.getTableName()).getScanner(new Scan()).iterator(); else iterator = null; return new AmpoolRecordSet(ampoolSplit, handles.build(), iterator); }
@Override public RecordSet getRecordSet(ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { requireNonNull(split, "partitionChunk is null"); ExampleSplit exampleSplit = checkType(split, ExampleSplit.class, "split"); checkArgument(exampleSplit.getConnectorId().equals(connectorId), "split is not for this connector"); ImmutableList.Builder<ExampleColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : columns) { handles.add(checkType(handle, ExampleColumnHandle.class, "handle")); } return new ExampleRecordSet(exampleSplit, handles.build()); }
@Override public synchronized CompletableFuture<List<ConnectorSplit>> getNextBatch(int maxSize) { checkState((future == null) || future.isDone(), "previous batch not completed"); future = supplyAsync(batchSupplier(maxSize), executor); return future; }
private static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSource) throws InterruptedException, ExecutionException { ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); while (!splitSource.isFinished()) { List<ConnectorSplit> batch = splitSource.getNextBatch(1000).get(); splits.addAll(batch); } return splits.build(); }
@Override public RecordSet getRecordSet(ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { JdbcSplit jdbcSplit = checkType(split, JdbcSplit.class, "split"); ImmutableList.Builder<JdbcColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : columns) { handles.add(checkType(handle, JdbcColumnHandle.class, "columnHandle")); } return new JdbcRecordSet(jdbcClient, jdbcSplit, handles.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SystemTableLayoutHandle layoutHandle = checkType(layout, SystemTableLayoutHandle.class, "layout"); SystemTableHandle tableHandle = layoutHandle.getTable(); TupleDomain<ColumnHandle> constraint = layoutHandle.getConstraint(); SystemTable systemTable = tables.get(tableHandle.getSchemaTableName()); Distribution tableDistributionMode = systemTable.getDistribution(); if (tableDistributionMode == SINGLE_COORDINATOR) { HostAddress address = nodeManager.getCurrentNode().getHostAndPort(); ConnectorSplit split = new SystemSplit(tableHandle.getConnectorId(), tableHandle, address, constraint); return new FixedSplitSource(GlobalSystemConnector.NAME, ImmutableList.of(split)); } ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); ImmutableSet.Builder<Node> nodes = ImmutableSet.builder(); if (tableDistributionMode == ALL_COORDINATORS) { nodes.addAll(nodeManager.getCoordinators()); } else if (tableDistributionMode == ALL_NODES) { nodes.addAll(nodeManager.getNodes(ACTIVE)); } Set<Node> nodeSet = nodes.build(); for (Node node : nodeSet) { splits.add(new SystemSplit(tableHandle.getConnectorId(), tableHandle, node.getHostAndPort(), constraint)); } return new FixedSplitSource(GlobalSystemConnector.NAME, splits.build()); }
@Inject public SplitJacksonModule(HandleResolver handleResolver) { super(ConnectorSplit.class, handleResolver::getId, handleResolver::getSplitClass); }
@JsonCreator public Split( @JsonProperty("connectorId") String connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @JsonProperty("connectorSplit") ConnectorSplit connectorSplit) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null"); this.connectorSplit = requireNonNull(connectorSplit, "connectorSplit is null"); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { requireNonNull(columns, "columns is null"); checkType(split, TestingSplit.class, "split"); // TODO: check for !columns.isEmpty() -- currently, it breaks TestSqlTaskManager // and fixing it requires allowing TableScan nodes with no assignments return new FixedPageSource(ImmutableList.of(new Page(1))); }
private static ConnectorSplitSource createFixedSplitSource(int splitCount, Supplier<ConnectorSplit> splitFactory) { ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (int i = 0; i < splitCount; i++) { splits.add(splitFactory.get()); } return new FixedSplitSource(CONNECTOR_ID, splits.build()); }
@Override public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { TpchSplit tpchSplit = checkType(split, TpchSplit.class, "split"); String tableName = tpchSplit.getTableHandle().getTableName(); TpchTable<?> tpchTable = TpchTable.getTable(tableName); return getRecordSet(tpchTable, columns, tpchSplit.getTableHandle().getScaleFactor(), tpchSplit.getPartNumber(), tpchSplit.getTotalParts()); }