@Override public RecordSet getRecordSet( ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns ) { EthereumSplit ethereumSplit = convertSplit(split); ImmutableList.Builder<EthereumColumnHandle> handleBuilder = ImmutableList.builder(); for (ColumnHandle handle : columns) { EthereumColumnHandle columnHandle = convertColumnHandle(handle); handleBuilder.add(columnHandle); } return new EthereumRecordSet(web3j, handleBuilder.build(), ethereumSplit); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { List<HDFSColumnHandle> hdfsColumns = columns.stream() .map(col -> (HDFSColumnHandle) col) .collect(Collectors.toList()); HDFSSplit hdfsSplit = checkType(split, HDFSSplit.class, "hdfs split"); Path path = new Path(hdfsSplit.getPath()); Optional<ConnectorPageSource> pageSource = createHDFSPageSource( path, hdfsSplit.getStart(), hdfsSplit.getLen(), hdfsColumns); if (pageSource.isPresent()) { return pageSource.get(); } throw new RuntimeException("Could not find a file reader for split " + hdfsSplit); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle handle, ConnectorSession session, ConnectorTableLayoutHandle layout) { log.info("INFORMATION: AmpoolSplitManager getSplits() called."); AmpoolTableLayoutHandle layoutHandle = (AmpoolTableLayoutHandle) layout; AmpoolTableHandle tableHandle = layoutHandle.getTable(); AmpoolTable table = new AmpoolTable(ampoolClient, tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table.getColumnsMetadata() != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); // TODO Pass here bucket id splits.add(new AmpoolSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(),"" ,HostAddress.fromParts("localhost",0))); Collections.shuffle(splits); return new FixedSplitSource(splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KuduTableLayoutHandle layoutHandle = checkType(layout, KuduTableLayoutHandle.class, "layout"); KuduTableHandle tableHandle = layoutHandle.getTable(); KuduClient kuduClient = kuduClientManager.getClient(); List<KuduScanToken> tokens = kuduClientManager.newScanTokenBuilder(kuduClient, tableHandle.getSchemaTableName().getTableName()).build(); TupleDomain<KuduColumnHandle> effectivePredicate = layoutHandle.getConstraint() .transform(handle -> checkType(handle, KuduColumnHandle.class, "columnHandle")); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (int i = 0; i < tokens.size(); i++) { // nodeManager.getWorkerNodes() List<HostAddress> hostAddresses = nodeManager.getWorkerNodes().stream() .map(node -> node.getHostAndPort()).collect(Collectors.toList()); ConnectorSplit split = new KuduSplit(hostAddresses, tableHandle.getSchemaTableName(), i, effectivePredicate); builder.add(split); } kuduClientManager.close(kuduClient); return new FixedSplitSource(builder.build()); }
@Override /** * @ */ public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { requireNonNull(split, "split is null"); KuduSplit kuduSplit = checkType(split, KuduSplit.class, "split"); ImmutableList.Builder<KuduColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : columns) { handles.add(checkType(handle, KuduColumnHandle.class, "handle")); } return new KuduRecordSet(kuduTable, kuduClientManager, kuduSplit, handles.build()); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { RaptorSplit raptorSplit = checkType(split, RaptorSplit.class, "split"); UUID shardUuid = raptorSplit.getShardUuid(); List<RaptorColumnHandle> columnHandles = columns.stream().map(toRaptorColumnHandle()).collect(toList()); List<Long> columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList()); List<Type> columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList()); return storageManager.getPageSource( shardUuid, columnIds, columnTypes, raptorSplit.getEffectivePredicate(), ReaderAttributes.from(session), raptorSplit.getTransactionId()); }
@Override public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle) { RaptorInsertTableHandle handle = checkType(tableHandle, RaptorInsertTableHandle.class, "tableHandle"); return new RaptorPageSink( pageSorter, storageManager, shardInfoCodec, handle.getTransactionId(), toColumnIds(handle.getColumnHandles()), handle.getColumnTypes(), Optional.empty(), toColumnIds(handle.getSortColumnHandles()), handle.getSortOrders(), maxBufferSize); }
@Override public List<TableLayoutResult> getLayouts(Session session, TableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { if (constraint.getSummary().isNone()) { return ImmutableList.of(); } TupleDomain<ColumnHandle> summary = constraint.getSummary(); String connectorId = table.getConnectorId(); ConnectorTableHandle connectorTable = table.getConnectorHandle(); Predicate<Map<ColumnHandle, NullableValue>> predicate = constraint.predicate(); ConnectorEntry entry = getConnectorMetadata(connectorId); ConnectorMetadata metadata = entry.getMetadata(session); ConnectorTransactionHandle transaction = entry.getTransactionHandle(session); ConnectorSession connectorSession = session.toConnectorSession(entry.getCatalog()); List<ConnectorTableLayoutResult> layouts = metadata.getTableLayouts(connectorSession, connectorTable, new Constraint<>(summary, predicate::test), desiredColumns); return layouts.stream() .map(layout -> new TableLayoutResult(fromConnectorLayout(connectorId, transaction, layout.getTableLayout()), layout.getUnenforcedConstraint())) .collect(toImmutableList()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { JmxTableLayoutHandle jmxLayout = checkType(layout, JmxTableLayoutHandle.class, "layout"); JmxTableHandle tableHandle = jmxLayout.getTable(); TupleDomain<ColumnHandle> predicate = jmxLayout.getConstraint(); //TODO is there a better way to get the node column? JmxColumnHandle nodeColumnHandle = tableHandle.getColumns().get(0); List<ConnectorSplit> splits = nodeManager.getNodes(ACTIVE) .stream() .filter(node -> { NullableValue value = NullableValue.of(VARCHAR, utf8Slice(node.getNodeIdentifier())); return predicate.overlaps(fromFixedValues(ImmutableMap.of(nodeColumnHandle, value))); }) .map(node -> new JmxSplit(tableHandle, ImmutableList.of(node.getHostAndPort()))) .collect(toList()); return new FixedSplitSource(connectorId, splits); }
@Override public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { CassandraSplit cassandraSplit = checkType(split, CassandraSplit.class, "split"); List<CassandraColumnHandle> cassandraColumns = columns.stream() .map(column -> checkType(column, CassandraColumnHandle.class, "columnHandle")) .collect(toList()); String selectCql = CassandraCqlUtils.selectFrom(cassandraSplit.getCassandraTableHandle(), cassandraColumns).getQueryString(); StringBuilder sb = new StringBuilder(selectCql); if (sb.charAt(sb.length() - 1) == ';') { sb.setLength(sb.length() - 1); } sb.append(cassandraSplit.getWhereClause()); String cql = sb.toString(); log.debug("Creating record set: %s", cql); return new CassandraRecordSet(cassandraSession, cassandraSplit.getSchema(), cql, cassandraColumns); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { CassandraTableLayoutHandle layoutHandle = checkType(layout, CassandraTableLayoutHandle.class, "layout"); CassandraTableHandle cassandraTableHandle = layoutHandle.getTable(); List<CassandraPartition> partitions = layoutHandle.getPartitions().get(); requireNonNull(partitions, "partitions is null"); if (partitions.isEmpty()) { return new FixedSplitSource(connectorId, ImmutableList.<ConnectorSplit>of()); } // if this is an unpartitioned table, split into equal ranges if (partitions.size() == 1) { CassandraPartition cassandraPartition = partitions.get(0); if (cassandraPartition.isUnpartitioned() || cassandraPartition.isIndexedColumnPredicatePushdown()) { CassandraTable table = schemaProvider.getTable(cassandraTableHandle); List<ConnectorSplit> splits = getSplitsByTokenRange(table, cassandraPartition.getPartitionId()); return new FixedSplitSource(connectorId, splits); } } return new FixedSplitSource(connectorId, getSplitsForPartitions(cassandraTableHandle, partitions)); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { TpchTableHandle tableHandle = checkType(layout, TpchTableLayoutHandle.class, "layout").getTable(); Set<Node> nodes = nodeManager.getActiveDatasourceNodes(connectorId); checkState(!nodes.isEmpty(), "No TPCH nodes available"); int totalParts = nodes.size() * splitsPerNode; int partNumber = 0; // Split the data using split and skew by the number of nodes available. ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (Node node : nodes) { for (int i = 0; i < splitsPerNode; i++) { splits.add(new TpchSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()))); partNumber++; } } return new FixedSplitSource(connectorId, splits.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KinesisTableLayoutHandle kinesislayout = handleResolver.convertLayout(layout); KinesisTableHandle kinesisTableHandle = kinesislayout.getTable(); InternalStreamDescription desc = this.getStreamDescription(kinesisTableHandle.getStreamName()); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (Shard shard : desc.getShards()) { KinesisSplit split = new KinesisSplit(connectorId, kinesisTableHandle.getStreamName(), kinesisTableHandle.getMessageDataFormat(), shard.getShardId(), shard.getSequenceNumberRange().getStartingSequenceNumber(), shard.getSequenceNumberRange().getEndingSequenceNumber()); builder.add(split); } return new FixedSplitSource(builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { InformationSchemaTableLayoutHandle handle = checkType(layout, InformationSchemaTableLayoutHandle.class, "layout"); Map<ColumnHandle, NullableValue> bindings = extractFixedValues(handle.getConstraint()).orElse(ImmutableMap.of()); List<HostAddress> localAddress = ImmutableList.of(nodeManager.getCurrentNode().getHostAndPort()); Map<String, NullableValue> filters = bindings.entrySet().stream().collect(toMap( entry -> checkType(entry.getKey(), InformationSchemaColumnHandle.class, "column").getColumnName(), Entry::getValue)); ConnectorSplit split = new InformationSchemaSplit(handle.getTable(), filters, localAddress); return new FixedSplitSource(null, ImmutableList.of(split)); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { InternalTable table = getInternalTable(transactionHandle, session, split, columns); List<Integer> channels = new ArrayList<>(); for (ColumnHandle column : columns) { String columnName = checkType(column, InformationSchemaColumnHandle.class, "column").getColumnName(); int columnIndex = table.getColumnIndex(columnName); channels.add(columnIndex); } ImmutableList.Builder<Page> pages = ImmutableList.builder(); for (Page page : table.getPages()) { Block[] blocks = new Block[channels.size()]; for (int index = 0; index < blocks.length; index++) { blocks[index] = page.getBlock(channels.get(index)); } pages.add(new Page(page.getPositionCount(), blocks)); } return new FixedPageSource(pages.build()); }
private InternalTable getInternalTable(ConnectorTransactionHandle transactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<ColumnHandle> columns) { InformationSchemaTransactionHandle transaction = checkType(transactionHandle, InformationSchemaTransactionHandle.class, "transaction"); InformationSchemaSplit split = checkType(connectorSplit, InformationSchemaSplit.class, "split"); requireNonNull(columns, "columns is null"); InformationSchemaTableHandle handle = split.getTableHandle(); Map<String, NullableValue> filters = split.getFilters(); Session session = Session.builder(metadata.getSessionPropertyManager()) .setTransactionId(transaction.getTransactionId()) .setQueryId(new QueryId(connectorSession.getQueryId())) .setIdentity(connectorSession.getIdentity()) .setSource("information_schema") .setCatalog("") // default catalog is not be used .setSchema("") // default schema is not be used .setTimeZoneKey(connectorSession.getTimeZoneKey()) .setLocale(connectorSession.getLocale()) .setStartTime(connectorSession.getStartTime()) .build(); return getInformationSchemaTable(session, handle.getCatalogName(), handle.getSchemaTableName(), filters); }
@Override public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint) { Builder table = InMemoryRecordSet.builder(transactionsTable); for (TransactionInfo info : transactionManager.getAllTransactionInfos()) { table.addRow( info.getTransactionId().toString(), info.getIsolationLevel().toString(), info.isReadOnly(), info.isAutoCommitContext(), info.getCreateTime().getMillis(), (long) info.getIdleTime().getValue(TimeUnit.SECONDS), info.getWrittenConnectorId().orElse(null), createStringsBlock(info.getConnectorIds())); } return table.build().cursor(); }
private static RecordSet toRecordSet(ConnectorTransactionHandle sourceTransaction, SystemTable table, ConnectorSession session, TupleDomain<Integer> constraint) { return new RecordSet() { private final List<Type> types = table.getTableMetadata().getColumns().stream() .map(ColumnMetadata::getType) .collect(toImmutableList()); @Override public List<Type> getColumnTypes() { return types; } @Override public RecordCursor cursor() { return table.cursor(sourceTransaction, session, constraint); } }; }
@Override public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint) { Builder table = InMemoryRecordSet.builder(QUERY_TABLE); for (QueryInfo queryInfo : queryManager.getAllQueryInfo()) { QueryStats queryStats = queryInfo.getQueryStats(); table.addRow( nodeId, queryInfo.getQueryId().toString(), queryInfo.getState().toString(), queryInfo.getSession().getUser(), queryInfo.getSession().getSource().orElse(null), queryInfo.getQuery(), toMillis(queryStats.getQueuedTime()), toMillis(queryStats.getAnalysisTime()), toMillis(queryStats.getDistributedPlanningTime()), toTimeStamp(queryStats.getCreateTime()), toTimeStamp(queryStats.getExecutionStartTime()), toTimeStamp(queryStats.getLastHeartbeat()), toTimeStamp(queryStats.getEndTime())); } return table.build().cursor(); }
@Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) { checkConnectorSupports(READ_UNCOMMITTED, isolationLevel); HDFSTransactionHandle transaction = new HDFSTransactionHandle(); transactions.putIfAbsent(transaction, hdfsMetadataFactory.create()); return transaction; }
/** * Guaranteed to be called at most once per transaction. The returned metadata will only be accessed * in a single threaded context. * * @param transactionHandle transaction handle */ @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) { HDFSMetadata metadata = transactions.get(transactionHandle); checkArgument(metadata != null, "no such transaction: %s", transactionHandle); return hdfsMetadataFactory.create(); }
@Override public RecordSink getRecordSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle connectorTableHandle) { RestInsertTableHandle insertTableHandle = Types.checkType(connectorTableHandle, RestInsertTableHandle.class, "tableHandle"); RestTableHandle tableHandle = insertTableHandle.getTableHandle(); SchemaTableName schemaTableName = tableHandle.getSchemaTableName(); Consumer<List> rowSink = rest.createRowSink(schemaTableName); List<Type> types = rest.getTypes(schemaTableName); return new InMemoryObjectRecordSink(types, rowSink); }
@Override public RecordSet getRecordSet( ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<? extends ColumnHandle> list) { RestConnectorSplit split = Types.checkType(connectorSplit, RestConnectorSplit.class, "split"); // TODO fix below cast List<RestColumnHandle> restColumnHandles = (List<RestColumnHandle>) list; SchemaTableName schemaTableName = split.getTableHandle().getSchemaTableName(); Collection<? extends List<?>> rows = rest.getRows(schemaTableName); ConnectorTableMetadata tableMetadata = rest.getTableMetadata(schemaTableName); List<Integer> columnIndexes = restColumnHandles.stream() .map(column -> { int index = 0; for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) { if (columnMetadata.getName().equalsIgnoreCase(column.getName())) { return index; } index++; } throw new IllegalStateException("Unknown column: " + column.getName()); }) .collect(toList()); Collection<? extends List<?>> mappedRows = rows.stream() .map(row -> columnIndexes.stream() .map(index -> row.get(index)) .collect(toList())) .collect(toList()); List<Type> mappedTypes = restColumnHandles.stream() .map(RestColumnHandle::getType) .collect(toList()); return new InMemoryRecordSet(mappedTypes, mappedRows); }
@Override public ConnectorHandleResolver getHandleResolver() { return new ConnectorHandleResolver() { public Class<? extends ConnectorTableHandle> getTableHandleClass() { return RestTableHandle.class; } public Class<? extends ColumnHandle> getColumnHandleClass() { return RestColumnHandle.class; } public Class<? extends ConnectorSplit> getSplitClass() { return RestConnectorSplit.class; } public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() { return RestConnectorTableLayoutHandle.class; } @Override public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() { return RestTransactionHandle.class; } @Override public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() { return RestInsertTableHandle.class; } }; }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { RestConnectorTableLayoutHandle layoutHandle = Types.checkType(layout, RestConnectorTableLayoutHandle.class, "layout"); List<HostAddress> addresses = nodeManager.getRequiredWorkerNodes().stream() .map(Node::getHostAndPort) .collect(toList()); return new FixedSplitSource(ImmutableList.of( new RestConnectorSplit(layoutHandle.getTableHandle(), addresses))); }
@Override public RecordSet getRecordSet(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<? extends ColumnHandle> list) { log.info("INFORMATION: AmpoolRecordSetProvider getRecordSet() called."); requireNonNull(connectorSplit, "split is null"); AmpoolSplit ampoolSplit = (AmpoolSplit) connectorSplit; checkArgument(ampoolSplit.getConnectorId().equals(connectorId), "split is not for this connector"); ImmutableList.Builder<AmpoolColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : list) { handles.add((AmpoolColumnHandle) handle); } // TODO: Projections and filters on Ampool side Iterator<Row> iterator; if (ampoolClient.existsFTable(ampoolSplit.getTableName())) iterator = ampoolClient.getFTable(ampoolSplit.getTableName()).getScanner(new Scan()).iterator(); else if (ampoolClient.existsMTable(ampoolSplit.getTableName())) iterator = ampoolClient.getMTable(ampoolSplit.getTableName()).getScanner(new Scan()).iterator(); else iterator = null; return new AmpoolRecordSet(ampoolSplit, handles.build(), iterator); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { requireNonNull(columns, "columns is null"); checkType(split, TestingSplit.class, "split"); // TODO: check for !columns.isEmpty() -- currently, it breaks TestSqlTaskManager // and fixing it requires allowing TableScan nodes with no assignments return new FixedPageSource(ImmutableList.of(new Page(1))); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { RaptorTableLayoutHandle handle = checkType(layout, RaptorTableLayoutHandle.class, "layout"); RaptorTableHandle table = handle.getTable(); TupleDomain<RaptorColumnHandle> effectivePredicate = toRaptorTupleDomain(handle.getConstraint()); return new RaptorSplitSource(table.getTableId(), effectivePredicate, table.getTransactionId()); }
@JsonCreator public Split( @JsonProperty("connectorId") String connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @JsonProperty("connectorSplit") ConnectorSplit connectorSplit) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null"); this.connectorSplit = requireNonNull(connectorSplit, "connectorSplit is null"); }
@Override public Optional<ResolvedIndex> resolveIndex(Session session, TableHandle tableHandle, Set<ColumnHandle> indexableColumns, Set<ColumnHandle> outputColumns, TupleDomain<ColumnHandle> tupleDomain) { ConnectorEntry entry = lookupConnectorFor(tableHandle); ConnectorMetadata metadata = entry.getMetadata(session); ConnectorTransactionHandle transaction = entry.getTransactionHandle(session); ConnectorSession connectorSession = session.toConnectorSession(entry.getCatalog()); Optional<ConnectorResolvedIndex> resolvedIndex = metadata.resolveIndex(connectorSession, tableHandle.getConnectorHandle(), indexableColumns, outputColumns, tupleDomain); return resolvedIndex.map(resolved -> new ResolvedIndex(tableHandle.getConnectorId(), transaction, resolved)); }
@Override public void rollback(ConnectorTransactionHandle transaction) { RaptorMetadata metadata = transactions.remove(transaction); checkArgument(metadata != null, "no such transaction: %s", transaction); metadata.rollback(); }
@Override public OutputTableHandle beginCreateTable(Session session, String catalogName, TableMetadata tableMetadata) { ConnectorEntry entry = connectorsByCatalog.get(catalogName); checkArgument(entry != null, "Catalog %s does not exist", catalogName); ConnectorMetadata metadata = entry.getMetadataForWrite(session); ConnectorTransactionHandle transactionHandle = entry.getTransactionHandle(session); ConnectorSession connectorSession = session.toConnectorSession(entry.getCatalog()); ConnectorOutputTableHandle handle = metadata.beginCreateTable(connectorSession, tableMetadata.getMetadata()); return new OutputTableHandle(entry.getConnectorId(), transactionHandle, handle); }
@JsonCreator public InsertTableHandle( @JsonProperty("connectorId") String connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @JsonProperty("connectorHandle") ConnectorInsertTableHandle connectorHandle) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null"); this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null"); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SpreadsheetTableLayoutHandle layoutHandle = (SpreadsheetTableLayoutHandle) layout; SpreadsheetTableHandle spreadsheetTableHandle = layoutHandle.getTable(); SpreadsheetSplit spreadsheetSplit = new SpreadsheetSplit(spreadsheetTableHandle); return new FixedSplitSource(ImmutableList.of(spreadsheetSplit)); }
@Override public Connector create(String connectorId, Map<String, String> properties) { int splitsPerNode = getSplitsPerNode(properties); return new Connector() { @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) { return TpchTransactionHandle.INSTANCE; } @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transaction) { return new TpchMetadata(connectorId); } @Override public ConnectorSplitManager getSplitManager() { return new TpchSplitManager(connectorId, nodeManager, splitsPerNode); } @Override public ConnectorRecordSetProvider getRecordSetProvider() { return new TpchRecordSetProvider(); } }; }
@Override public void rollback(ConnectorTransactionHandle transactionHandle) { LegacyConnectorMetadata metadata = metadatas.remove(transactionHandle); if (metadata != null) { metadata.tryRollback(); } }
private ConnectorTransactionHandle beginTransaction(Connector connector) { if (connector instanceof InternalConnector) { return ((InternalConnector) connector).beginTransaction(transactionId, isolationLevel, readOnly); } else { return connector.beginTransaction(isolationLevel, readOnly); } }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SystemTableLayoutHandle layoutHandle = checkType(layout, SystemTableLayoutHandle.class, "layout"); SystemTableHandle tableHandle = layoutHandle.getTable(); TupleDomain<ColumnHandle> constraint = layoutHandle.getConstraint(); SystemTable systemTable = tables.get(tableHandle.getSchemaTableName()); Distribution tableDistributionMode = systemTable.getDistribution(); if (tableDistributionMode == SINGLE_COORDINATOR) { HostAddress address = nodeManager.getCurrentNode().getHostAndPort(); ConnectorSplit split = new SystemSplit(tableHandle.getConnectorId(), tableHandle, address, constraint); return new FixedSplitSource(GlobalSystemConnector.NAME, ImmutableList.of(split)); } ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); ImmutableSet.Builder<Node> nodes = ImmutableSet.builder(); if (tableDistributionMode == ALL_COORDINATORS) { nodes.addAll(nodeManager.getCoordinators()); } else if (tableDistributionMode == ALL_NODES) { nodes.addAll(nodeManager.getNodes(ACTIVE)); } Set<Node> nodeSet = nodes.build(); for (Node node : nodeSet) { splits.add(new SystemSplit(tableHandle.getConnectorId(), tableHandle, node.getHostAndPort(), constraint)); } return new FixedSplitSource(GlobalSystemConnector.NAME, splits.build()); }
@Override public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint) { Builder table = InMemoryRecordSet.builder(METADATA); for (String name : metadata.getCatalogNames().keySet()) { table.addRow(name); } return table.build().cursor(); }
@Override public InsertTableHandle beginInsert(Session session, TableHandle tableHandle) { ConnectorEntry entry = lookupConnectorFor(tableHandle); ConnectorMetadata metadata = entry.getMetadataForWrite(session); ConnectorTransactionHandle transactionHandle = entry.getTransactionHandle(session); ConnectorInsertTableHandle handle = metadata.beginInsert(session.toConnectorSession(entry.getCatalog()), tableHandle.getConnectorHandle()); return new InsertTableHandle(tableHandle.getConnectorId(), transactionHandle, handle); }