@JsonCreator public HDFSSplit( @JsonProperty("connectorId") HDFSConnectorId connectorId, @JsonProperty("table") SchemaTableName table, @JsonProperty("path") String path, @JsonProperty("start") long start, @JsonProperty("len") long len, @JsonProperty("addresses") List<HostAddress> addresses ) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.table = requireNonNull(table, "table is null"); this.path = requireNonNull(path, "path is null"); this.start = requireNonNull(start); this.len = requireNonNull(len); this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); }
private History getHistory(SchemaTableName schemaTableName) { String schemaName = schemaTableName.getSchemaName(); try { if (CHANNEL_SCHEMA.equalsIgnoreCase(schemaName)) { return service.channelHistory(token, getChannelId(schemaTableName)) .execute() .body(); } if (IM_SCHEMA.equalsIgnoreCase(schemaName)) { return service.imHistory(token, getChannelId(schemaTableName)) .execute() .body(); } } catch (IOException e) { throw Throwables.propagate(e); } return new History(true, "no such schema", ImmutableList.of()); }
@Override public Collection<? extends List<?>> getRows(SchemaTableName schemaTableName) { try { Response<List<Issue>> execute = service.listPrestoIssues().execute(); if (!execute.isSuccessful()) { throw new IllegalStateException("Unable to read: " + execute.message()); } List<Issue> issues = execute.body(); return issues.stream() .map(issue -> ImmutableList.of(issue.getNumber(), issue.getState(), issue.getUser().getLogin(), issue.getTitle())) .collect(toList()); } catch (IOException e) { throw Throwables.propagate(e); } }
@Override public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { requireNonNull(prefix, "prefix is null"); ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder(); List<SchemaTableName> tableNames = prefix.getSchemaName() == null ? listTables(session, null) : ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); for (SchemaTableName tableName : tableNames) { ConnectorTableMetadata tableMetadata = getTableMetadata(tableName); // table can disappear during listing operation if (tableMetadata != null) { columns.put(tableName, tableMetadata.getColumns()); } } return columns.build(); }
public Map<SchemaTableName, KuduTableHandle> getTables(KuduClient kuduClient) { Map<SchemaTableName, KuduTableHandle> tables = null; ImmutableMap.Builder<SchemaTableName, KuduTableHandle> tablesBuilder = ImmutableMap.builder(); // ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> tableColumnsBuilder = ImmutableMap.builder(); List<String> listTable = null; try { listTable = kuduClient.getTablesList().getTablesList(); } catch (KuduException e) { e.printStackTrace(); } for (String table : listTable) { SchemaTableName schemaTableName = new SchemaTableName(PRESTO_KUDU_SCHEMA, table); tablesBuilder.put(schemaTableName, new KuduTableHandle(schemaTableName)); } tables = tablesBuilder.build(); return tables; }
@Override public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { requireNonNull(prefix, "prefix is null"); KuduClient kuduClient = kuduClientManager.getClient(); ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder(); for (SchemaTableName tableName : listTables(session, prefix)) { KuduTableHandle tableHandle = kuduTables.getTables(kuduClient).get(tableName); if (tableHandle != null) { columns.put(tableName, kuduTables.getColumns(kuduClient, tableHandle)); } } kuduClientManager.close(kuduClient); return columns.build(); }
public KuduRecordCursor(KuduClientManager kuduClientManager, int kuduTokenId, List<KuduColumnHandle> columns, SchemaTableName tableName, TupleDomain<KuduColumnHandle> predicate) { this.kuduClientManager = requireNonNull(kuduClientManager, "kuduClientManager is null"); this.columns = requireNonNull(columns, "columns is null"); fieldToColumnIndex = new int[columns.size()]; for (int i = 0; i < columns.size(); i++) { KuduColumnHandle columnHandle = columns.get(i); fieldToColumnIndex[i] = columnHandle.getOrdinalPosition(); } this.kuduClient = requireNonNull(kuduClientManager.getClient(), "kuduClient is null"); List<KuduScanToken> tokends = kuduClientManager .newScanTokenBuilder(this.kuduClient, tableName.getTableName()) .setProjectedColumnNames(columns.stream().map(column->column.getColumnName()).collect(Collectors.toList())) .build(); try { this.kuduScanner = tokends.get(kuduTokenId).intoScanner(this.kuduClient); } catch (Exception e) { logger.error(e, e.getMessage()); } }
@BeforeMethod public void spinUp() throws Exception { ImmutableMap<SchemaTableName, KinesisStreamDescription> streamMap = ImmutableMap.<SchemaTableName, KinesisStreamDescription>builder(). put(TestUtils.createEmptyStreamDescription(dummyStreamName, new SchemaTableName("default", dummyStreamName))). put(TestUtils.createSimpleJsonStreamDescription(jsonStreamName, new SchemaTableName("default", jsonStreamName))). build(); this.queryRunner = new StandaloneQueryRunner(SESSION); KinesisPlugin plugin = TestUtils.installKinesisPlugin(queryRunner, streamMap); clientManager = TestUtils.getTestClientManager(plugin.getInjector()); mockClient = (MockKinesisClient) clientManager.getClient(); mockClient.createStream(dummyStreamName, 2); mockClient.createStream(jsonStreamName, 2); log.info("Completed spinUp steps. *** READY FOR QUERIES ***"); }
@Test public void testTypesOrcRecordCursor() throws Exception { ConnectorSession session = newSession(); if (metadata.getTableHandle(session, new SchemaTableName(database, "presto_test_types_orc")) == null) { return; } ConnectorTableHandle tableHandle = getTableHandle(new SchemaTableName(database, "presto_test_types_orc")); ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle); HiveSplit hiveSplit = getHiveSplit(tableHandle); List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values()); ConnectorPageSourceProvider pageSourceProvider = new HivePageSourceProvider( new HiveClientConfig().setTimeZone(timeZone.getID()), hdfsEnvironment, ImmutableSet.<HiveRecordCursorProvider>of(new OrcRecordCursorProvider()), ImmutableSet.<HivePageSourceFactory>of(), TYPE_MANAGER); ConnectorPageSource pageSource = pageSourceProvider.createPageSource(session, hiveSplit, columnHandles); assertGetRecords(ORC, tableMetadata, hiveSplit, pageSource, columnHandles); }
@Override public TpchTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { requireNonNull(tableName, "tableName is null"); if (!tableNames.contains(tableName.getTableName())) { return null; } // parse the scale factor double scaleFactor = schemaNameToScaleFactor(tableName.getSchemaName()); if (scaleFactor < 0) { return null; } return new TpchTableHandle(connectorId, tableName.getTableName(), scaleFactor); }
@Test public void testGetTableHandle() throws IOException { Path basePath = setupTest(conf, SESSION.getUser(), SpreadsheetMetadataTest.class); SpreadsheetMetadata spreadsheetMetadata = new SpreadsheetMetadata(ugi, conf, basePath, SPREADSHEETS, useFileCache, true); List<SchemaTableName> listTables = spreadsheetMetadata.listTables(SESSION, SCHEMA_NAME); for (SchemaTableName name : listTables) { ConnectorTableHandle tableHandle = spreadsheetMetadata.getTableHandle(SESSION, name); assertTrue(tableHandle instanceof SpreadsheetTableHandle); SpreadsheetTableHandle spreadsheetTableHandle = (SpreadsheetTableHandle) tableHandle; String filePath = new Path(new Path(new Path(basePath, SESSION.getUser()), SPREADSHEETS), PRESTO_EXAMPLE_XLSX).toString(); assertEquals(filePath, spreadsheetTableHandle.getSpreadsheetPath()); SchemaTableName tableName = spreadsheetTableHandle.getTableName(); assertEquals(name, tableName); assertEquals(SESSION.getUser(), spreadsheetTableHandle.getUser()); } }
@Override public TableColumn map(int index, ResultSet r, StatementContext ctx) throws SQLException { SchemaTableName table = new SchemaTableName( r.getString("schema_name"), r.getString("table_name")); String typeName = r.getString("data_type"); Type type = typeManager.getType(parseTypeSignature(typeName)); checkArgument(type != null, "Unknown type %s", typeName); return new TableColumn( table, r.getString("column_name"), type, r.getLong("column_id")); }
@Test public void testNoTableFilter() throws Exception { // Create "orders" table in a different schema metadata.createTable(SESSION, tableMetadataBuilder(new SchemaTableName("test", "orders2")) .column("orderkey", BIGINT) .build()); // Create another table that should not be selected metadata.createTable(SESSION, tableMetadataBuilder(new SchemaTableName("schema1", "foo")) .column("orderkey", BIGINT) .build()); TupleDomain<Integer> tupleDomain = TupleDomain.withColumnDomains( ImmutableMap.<Integer, Domain>builder() .put(0, Domain.singleValue(VARCHAR, utf8Slice("test"))) .build()); MetadataDao metadataDao = dummyHandle.attach(MetadataDao.class); Set<Long> actual = ImmutableSet.copyOf(ShardMetadataRecordCursor.getTableIds(dbi, tupleDomain)); Set<Long> expected = ImmutableSet.of( metadataDao.getTableInformation("test", "orders").getTableId(), metadataDao.getTableInformation("test", "orders2").getTableId()); assertEquals(actual, expected); }
@Override public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) { verifyJvmTimeZone(); SchemaTableName tableName = schemaTableName(tableHandle); Optional<Table> table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName()); if (!table.isPresent()) { throw new TableNotFoundException(tableName); } checkTableIsWritable(table.get()); HiveStorageFormat hiveStorageFormat = extractHiveStorageFormat(table.get()); List<HiveColumnHandle> handles = hiveColumnHandles(connectorId, table.get()); return new HiveInsertTableHandle( connectorId, tableName.getSchemaName(), tableName.getTableName(), handles, session.getQueryId(), locationService.forExistingTable(session.getQueryId(), table.get()), hiveStorageFormat); }
private Table getTable(HiveMetastore metastore, SchemaTableName tableName) { Optional<Table> target = metastore.getTable(tableName.getSchemaName(), tableName.getTableName()); if (!target.isPresent()) { throw new TableNotFoundException(tableName); } Table table = target.get(); String protectMode = table.getParameters().get(ProtectMode.PARAMETER_NAME); if (protectMode != null && getProtectModeFromString(protectMode).offline) { throw new TableOfflineException(tableName); } String prestoOffline = table.getParameters().get(PRESTO_OFFLINE); if (!isNullOrEmpty(prestoOffline)) { throw new TableOfflineException(tableName, format("Table '%s' is offline for Presto: %s", tableName, prestoOffline)); } return table; }
@Test public void getTableMetadata() { // known table ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(SESSION, tableHandle); assertEquals(tableMetadata.getTable(), new SchemaTableName("example", "numbers")); assertEquals(tableMetadata.getColumns(), ImmutableList.of( new ColumnMetadata("text", VARCHAR, false), new ColumnMetadata("value", BIGINT, false))); // escaping name patterns JdbcTableHandle specialTableHandle = metadata.getTableHandle(SESSION, new SchemaTableName("exa_ple", "num_ers")); ConnectorTableMetadata specialTableMetadata = metadata.getTableMetadata(SESSION, specialTableHandle); assertEquals(specialTableMetadata.getTable(), new SchemaTableName("exa_ple", "num_ers")); assertEquals(specialTableMetadata.getColumns(), ImmutableList.of( new ColumnMetadata("te_t", VARCHAR, false), new ColumnMetadata("va%ue", BIGINT, false))); // unknown tables should produce null unknownTableMetadata(new JdbcTableHandle(CONNECTOR_ID, new SchemaTableName("u", "numbers"), null, "unknown", "unknown")); unknownTableMetadata(new JdbcTableHandle(CONNECTOR_ID, new SchemaTableName("example", "numbers"), null, "example", "unknown")); unknownTableMetadata(new JdbcTableHandle(CONNECTOR_ID, new SchemaTableName("example", "numbers"), null, "unknown", "numbers")); }
KinesisConnectorFactory(ClassLoader classLoader, Optional<Supplier<Map<SchemaTableName, KinesisStreamDescription>>> tableDescriptionSupplier, Map<String, String> optionalConfig, Optional<Class<? extends KinesisClientProvider>> altProviderClass) { this.classLoader = classLoader; this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null"); this.optionalConfig = requireNonNull(optionalConfig, "optionalConfig is null"); this.altProviderClass = requireNonNull(altProviderClass, "altProviderClass is null"); this.handleResolver = new KinesisHandleResolver(connectorName); // Explanation: AWS uses a newer version of jackson (2.6.6) than airlift (2.4.4). In order to upgrade // to the latest version of the AWS API, we need to turn this feature off. This can be set // in jvm.properties but trying to make this more foolproof. System.setProperty("com.amazonaws.sdk.disableCbor", "true"); }
@Inject KinesisMetadata(@Named("connectorId") String connectorId, KinesisConnectorConfig kinesisConnectorConfig, KinesisHandleResolver handleResolver, Supplier<Map<SchemaTableName, KinesisStreamDescription>> kinesisTableDescriptionSupplier, Set<KinesisInternalFieldDescription> internalFieldDescriptions) { this.connectorId = checkNotNull(connectorId, "connectorId is null"); this.kinesisConnectorConfig = checkNotNull(kinesisConnectorConfig, "kinesisConfig is null"); this.handleResolver = checkNotNull(handleResolver, "handleResolver is null"); log.debug("Loading kinesis table definitions from %s", kinesisConnectorConfig.getTableDescriptionDir()); this.kinesisTableDescriptionSupplier = kinesisTableDescriptionSupplier; this.internalFieldDescriptions = checkNotNull(internalFieldDescriptions, "internalFieldDescriptions is null"); }
@Override public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { requireNonNull(prefix, "prefix is null"); ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder(); List<SchemaTableName> tableNames; if (prefix.getSchemaName() == null) { tableNames = listTables(session, null); } else { tableNames = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); } for (SchemaTableName tableName : tableNames) { ConnectorTableMetadata tableMetadata = getTableMetadata(tableName); // table can disappear during listing operation if (tableMetadata != null) { columns.put(tableName, tableMetadata.getColumns()); } } return columns.build(); }
@SuppressWarnings("ValueOfIncrementOrDecrementUsed") private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) { RedisTableDescription table = getDefinedTables().get(schemaTableName); if (table == null) { throw new TableNotFoundException(schemaTableName); } ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder(); appendFields(builder, table.getKey()); appendFields(builder, table.getValue()); for (RedisInternalFieldDescription fieldDescription : internalFieldDescriptions) { builder.add(fieldDescription.getColumnMetadata(hideInternalColumns)); } return new ConnectorTableMetadata(schemaTableName, builder.build()); }
public static Map.Entry<SchemaTableName, RedisTableDescription> loadTpchTableDescription( JsonCodec<RedisTableDescription> tableDescriptionJsonCodec, SchemaTableName schemaTableName, String dataFormat) throws IOException { RedisTableDescription tpchTemplate; try (InputStream data = RedisTestUtils.class.getResourceAsStream(format("/tpch/%s/%s.json", dataFormat, schemaTableName.getTableName()))) { tpchTemplate = tableDescriptionJsonCodec.fromJson(ByteStreams.toByteArray(data)); } RedisTableDescription tableDescription = new RedisTableDescription( schemaTableName.getTableName(), schemaTableName.getSchemaName(), tpchTemplate.getKey(), tpchTemplate.getValue()); return new AbstractMap.SimpleImmutableEntry<>(schemaTableName, tableDescription); }
private TableMetadata getTableMetadata(SchemaTableName schemaTableName) { String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); KeyspaceMetadata keyspaceMetadata = getCheckedKeyspaceMetadata(schemaName); TableMetadata tableMetadata = keyspaceMetadata.getTable(tableName); if (tableMetadata != null) { return tableMetadata; } for (TableMetadata table : keyspaceMetadata.getTables()) { if (table.getName().equalsIgnoreCase(tableName)) { return table; } } throw new TableNotFoundException(schemaTableName); }
/** * Install the plugin into the given query runner, using the mock client and the given table descriptions. * * The plug in is returned so that the injector can be accessed and other setup items tested. * * @param queryRunner * @param streamDescriptions * @return */ public static KinesisPlugin installKinesisPlugin(QueryRunner queryRunner, Map<SchemaTableName, KinesisStreamDescription> streamDescriptions) { KinesisPlugin kinesisPlugin = createPluginInstance(); // Note: function literal with provided descriptions instead of KinesisTableDescriptionSupplier: kinesisPlugin.setTableDescriptionSupplier(() -> streamDescriptions); kinesisPlugin.setAltProviderClass(KinesisTestClientManager.class); queryRunner.installPlugin(kinesisPlugin); Map<String, String> kinesisConfig = ImmutableMap.of( "kinesis.default-schema", "default", "kinesis.access-key", "", "kinesis.secret-key", ""); queryRunner.createCatalog("kinesis", "kinesis", kinesisConfig); return kinesisPlugin; }
public List<SchemaTableName> listTables(SchemaTablePrefix prefix) { log.info("List all tables with prefix " + prefix.toString()); List<SchemaTableName> tables = new ArrayList<>(); String dbPrefix = prefix.getSchemaName(); log.debug("listTables dbPrefix: " + dbPrefix); String tblPrefix = prefix.getTableName(); log.debug("listTables tblPrefix: " + tblPrefix); // if dbPrefix not mean to match all String tblName; String dbName; if (dbPrefix != null) { if (tblPrefix != null) { tblName = tblPrefix; dbName = dbPrefix; } else { MetaProto.StringListType stringListType = metaClient.listTables(dbPrefix); log.info("record size: " + stringListType.getStrCount()); if (stringListType.getStrCount() == 0) { return tables; } for (int i = 0; i < stringListType.getStrCount(); i++) { tblName = stringListType.getStr(0); dbName = dbPrefix; log.debug("listTables tableName: " + formName(dbName, tblName)); tables.add(new SchemaTableName(dbName, tblName)); } } } return tables; }
/** * Returns a table handle for the specified table name, or null if the connector does not contain the table. * * @param session session * @param tableName table name */ @Override public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { Optional<HDFSTableHandle> table = metaDataQuery.getTableHandle(connectorId, tableName.getSchemaName(), tableName.getTableName()); return table.orElse(null); }
/** * Return a list of table layouts that satisfy the given constraint. * <p> * For each layout, connectors must return an "unenforced constraint" representing the part of the constraint summary that isn't guaranteed by the layout. * * @param session session * @param table table * @param constraint constraint * @param desiredColumns desired columns */ @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { // get table name from ConnectorTableHandle HDFSTableHandle hdfsTable = checkType(table, HDFSTableHandle.class, "table"); SchemaTableName tableName = hdfsTable.getSchemaTableName(); // create HDFSTableLayoutHandle HDFSTableLayoutHandle tableLayout = metaDataQuery.getTableLayout(connectorId, tableName.getSchemaName(), tableName.getTableName()).orElse(null); tableLayout.setPredicates(constraint.getSummary() != null ? Optional.of(constraint.getSummary()) : Optional.empty()); // ConnectorTableLayout layout = new ConnectorTableLayout(HDFSTableLayoutHandle) ConnectorTableLayout layout = getTableLayout(session, tableLayout); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
/** * Return the metadata for the specified table handle. * * @param session session * @param table table * @throws RuntimeException if table handle is no longer valid */ @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { HDFSTableHandle hdfsTable = checkType(table, HDFSTableHandle.class, "table"); SchemaTableName tableName = hdfsTable.getSchemaTableName(); return getTableMetadata(tableName); }
/** * List table names, possibly filtered by schema. An empty list is returned if none match. * * @param session session * @param schemaNameOrNull schema name */ @Override public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) { if (schemaNameOrNull == null) { return new ArrayList<>(); } return metaDataQuery.listTables(new SchemaTablePrefix(schemaNameOrNull)); }
/** * Gets the metadata for all columns that match the specified table prefix. * * @param session session * @param prefix prefix */ @Override public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { Map<SchemaTableName, List<ColumnMetadata>> tableColumns = new HashMap<>(); List<SchemaTableName> tableNames = metaDataQuery.listTables(prefix); for (SchemaTableName table : tableNames) { List<ColumnMetadata> columnMetadatas = metaDataQuery.getTableColMetadata(connectorId, table.getSchemaName(), table.getTableName()).orElse(new ArrayList<>()); tableColumns.putIfAbsent(table, columnMetadatas); } return tableColumns; }
@Override public ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) { return new ConnectorTableMetadata( schemaTableName, ImmutableList.of( new ColumnMetadata("type", createUnboundedVarcharType()), new ColumnMetadata("user", createUnboundedVarcharType()), new ColumnMetadata("text", createUnboundedVarcharType()))); }
@Override public List<SchemaTableName> listTables(String schema) { if (CHANNEL_SCHEMA.equalsIgnoreCase(schema)) { return channels.keySet().stream() .map(tableName -> new SchemaTableName(CHANNEL_SCHEMA, tableName)) .collect(toList()); } if (IM_SCHEMA.equalsIgnoreCase(schema)) { return users.keySet().stream() .map(tableName -> new SchemaTableName(IM_SCHEMA, tableName)) .collect(toList()); } return ImmutableList.of(); }
@Override public Collection<? extends List<?>> getRows(SchemaTableName schemaTableName) { History history = getHistory(schemaTableName); if (history.getError() != null) { throw new IllegalStateException("Unable to read from '" + schemaTableName + "' dues: " + history.getError()); } return history .getMessages().stream() .map(message -> asList(message.getType(), message.getUser(), message.getText())) .collect(toList()); }
private String getChannelId(SchemaTableName schemaTableName) { String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); if (CHANNEL_SCHEMA.equalsIgnoreCase(schemaName)) { return channels.get(tableName).getId(); } if (IM_SCHEMA.equalsIgnoreCase(schemaName)) { String userId = users.get(tableName).getId(); return ims.get(userId).getId(); } throw new IllegalArgumentException("Unknown schema: " + schemaName); }
@Override public Consumer<List> createRowSink(SchemaTableName schemaTableName) { return list -> { try { SlackResponse body = service.postMessage(token, getChannelId(schemaTableName), (String) list.get(2)).execute().body(); if (body.getError() != null) { throw new IllegalStateException("Unable to write to '" + schemaTableName + "' dues: " + body.getError()); } } catch (IOException e) { throw Throwables.propagate(e); } }; }
@Override public ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) { return new ConnectorTableMetadata( schemaTableName, ImmutableList.of( new ColumnMetadata("number", BigintType.BIGINT), new ColumnMetadata("state", createUnboundedVarcharType()), new ColumnMetadata("user", createUnboundedVarcharType()), new ColumnMetadata("title", createUnboundedVarcharType()))); }
@Override public RecordSink getRecordSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle connectorTableHandle) { RestInsertTableHandle insertTableHandle = Types.checkType(connectorTableHandle, RestInsertTableHandle.class, "tableHandle"); RestTableHandle tableHandle = insertTableHandle.getTableHandle(); SchemaTableName schemaTableName = tableHandle.getSchemaTableName(); Consumer<List> rowSink = rest.createRowSink(schemaTableName); List<Type> types = rest.getTypes(schemaTableName); return new InMemoryObjectRecordSink(types, rowSink); }
@Override public RecordSet getRecordSet( ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<? extends ColumnHandle> list) { RestConnectorSplit split = Types.checkType(connectorSplit, RestConnectorSplit.class, "split"); // TODO fix below cast List<RestColumnHandle> restColumnHandles = (List<RestColumnHandle>) list; SchemaTableName schemaTableName = split.getTableHandle().getSchemaTableName(); Collection<? extends List<?>> rows = rest.getRows(schemaTableName); ConnectorTableMetadata tableMetadata = rest.getTableMetadata(schemaTableName); List<Integer> columnIndexes = restColumnHandles.stream() .map(column -> { int index = 0; for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) { if (columnMetadata.getName().equalsIgnoreCase(column.getName())) { return index; } index++; } throw new IllegalStateException("Unknown column: " + column.getName()); }) .collect(toList()); Collection<? extends List<?>> mappedRows = rows.stream() .map(row -> columnIndexes.stream() .map(index -> row.get(index)) .collect(toList())) .collect(toList()); List<Type> mappedTypes = restColumnHandles.stream() .map(RestColumnHandle::getType) .collect(toList()); return new InMemoryRecordSet(mappedTypes, mappedRows); }
@Override public ConnectorTableHandle getTableHandle(ConnectorSession connectorSession, SchemaTableName schemaTableName) { if (rest.listTables().contains(schemaTableName)) { return new RestTableHandle(schemaTableName); } return null; }
@Override public ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) { return new ConnectorTableMetadata( schemaTableName, ImmutableList.of( new ColumnMetadata("id", createUnboundedVarcharType()), new ColumnMetadata("text", createUnboundedVarcharType()), new ColumnMetadata("retweet_count", BigintType.BIGINT), new ColumnMetadata("user_name", createUnboundedVarcharType()), new ColumnMetadata("user_screen_name", createUnboundedVarcharType()))); }
@Override public List<SchemaTableName> listTables(String schema) { if (schema.equalsIgnoreCase(SCHEMA)) { return ImmutableList.of( new SchemaTableName(SCHEMA, "whug"), new SchemaTableName(SCHEMA, "prestodb"), new SchemaTableName(SCHEMA, "teradata"), new SchemaTableName(SCHEMA, "hive"), new SchemaTableName(SCHEMA, "dupa") ); } return ImmutableList.of(); }