private static Connector createTestingConnector(String connectorId) { return new LegacyTransactionConnector(connectorId, new com.facebook.presto.spi.Connector() { private final ConnectorMetadata metadata = new TestingMetadata(); @Override public ConnectorMetadata getMetadata() { return metadata; } @Override public ConnectorSplitManager getSplitManager() { throw new UnsupportedOperationException(); } }); }
public HiveConnector( LifeCycleManager lifeCycleManager, ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorPageSourceProvider pageSourceProvider, ConnectorPageSinkProvider pageSinkProvider, Set<SystemTable> systemTables, List<PropertyMetadata<?>> sessionProperties, List<PropertyMetadata<?>> tableProperties, ConnectorAccessControl accessControl) { this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.splitManager = requireNonNull(splitManager, "splitManager is null"); this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null"); this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null")); this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null")); this.accessControl = requireNonNull(accessControl, "accessControl is null"); }
@Inject public KafkaConnector(ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider, ConnectorHandleResolver handleResolver) { construct(metadata, splitManager, recordSetProvider, handleResolver); }
private void construct(ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider, ConnectorHandleResolver handleResolver) { this.metadata = checkNotNull(metadata, "metadata is null"); this.splitManager = checkNotNull(splitManager, "splitManager is null"); this.recordSetProvider = checkNotNull(recordSetProvider, "recordSetProvider is null"); this.handleResolver = checkNotNull(handleResolver, "handleResolver is null"); }
@Override public Connector create(final String connectorId, Map<String, String> requiredConfig) { checkNotNull(requiredConfig, "requiredConfig is null"); checkNotNull(optionalConfig, "optionalConfig is null"); try { // // A plugin is not required to use Guice; it is just very convenient // Bootstrap app = new Bootstrap(new JsonModule(), new ExampleModule(connectorId)); // // Injector injector = app.strictConfig().doNotInitializeLogging() // .setRequiredConfigurationProperties(requiredConfig) // .setOptionalConfigurationProperties(optionalConfig).initialize(); ClassToInstanceMap<Object> services = ImmutableClassToInstanceMap.builder() .put(ConnectorMetadata.class, new CloudataConnectorMetadata(connectorId, store)) .put(ConnectorSplitManager.class, new CloudataSplitManager(nodeManager, connectorId)) .put(ConnectorRecordSetProvider.class, new CloudataConnectorRecordSetProvider()) .put(ConnectorHandleResolver.class, new CloudataConnectorHandleResolver()).build(); CloudataConnector connector = new CloudataConnector(store, services); connectors.put(connectorId, connector); return connector; } catch (Exception e) { throw Throwables.propagate(e); } }
@Override public ConnectorMetadata getMetadata() { return metadata; }
@Override public ConnectorMetadata getMetadata() { return jdbcMetadata; }
public ClassLoaderSafeConnectorMetadata(ConnectorMetadata delegate, ClassLoader classLoader) { this.delegate = requireNonNull(delegate, "delegate is null"); this.classLoader = requireNonNull(classLoader, "classLoader is null"); }
@Override public void configure(Binder binder) { binder.bind(HiveConnectorId.class).toInstance(new HiveConnectorId(connectorId)); binder.bind(HdfsConfigurationUpdater.class).in(Scopes.SINGLETON); binder.bind(HdfsConfiguration.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON); binder.bind(HdfsEnvironment.class).in(Scopes.SINGLETON); binder.bind(DirectoryLister.class).to(HadoopDirectoryLister.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(HiveClientConfig.class); binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON); binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON); if (metastore != null) { binder.bind(HiveMetastore.class).toInstance(metastore); } else { binder.bind(HiveMetastore.class).to(CachingHiveMetastore.class).in(Scopes.SINGLETON); newExporter(binder).export(HiveMetastore.class) .as(generatedNameOf(CachingHiveMetastore.class, connectorId)); } binder.bind(NamenodeStats.class).in(Scopes.SINGLETON); newExporter(binder).export(NamenodeStats.class).as(generatedNameOf(NamenodeStats.class)); binder.bind(HiveMetastoreClientFactory.class).in(Scopes.SINGLETON); binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(StaticMetastoreConfig.class); binder.bind(TypeManager.class).toInstance(typeManager); binder.bind(PageIndexerFactory.class).toInstance(pageIndexerFactory); Multibinder<HiveRecordCursorProvider> recordCursorProviderBinder = Multibinder.newSetBinder(binder, HiveRecordCursorProvider.class); recordCursorProviderBinder.addBinding().to(OrcRecordCursorProvider.class).in(Scopes.SINGLETON); recordCursorProviderBinder.addBinding().to(ParquetRecordCursorProvider.class).in(Scopes.SINGLETON); recordCursorProviderBinder.addBinding().to(DwrfRecordCursorProvider.class).in(Scopes.SINGLETON); recordCursorProviderBinder.addBinding().to(ColumnarTextHiveRecordCursorProvider.class).in(Scopes.SINGLETON); recordCursorProviderBinder.addBinding().to(ColumnarBinaryHiveRecordCursorProvider.class).in(Scopes.SINGLETON); recordCursorProviderBinder.addBinding().to(GenericHiveRecordCursorProvider.class).in(Scopes.SINGLETON); binder.bind(HivePartitionManager.class).in(Scopes.SINGLETON); binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON); binder.bind(ConnectorMetadata.class).to(HiveMetadata.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSourceProvider.class).to(HivePageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSinkProvider.class).to(HivePageSinkProvider.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class); Multibinder<HivePageSourceFactory> pageSourceFactoryBinder = Multibinder.newSetBinder(binder, HivePageSourceFactory.class); pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(DwrfPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON); binder.bind(PrestoS3FileSystemStats.class).toInstance(PrestoS3FileSystem.getFileSystemStats()); newExporter(binder).export(PrestoS3FileSystemStats.class).as(generatedNameOf(PrestoS3FileSystem.class, connectorId)); }
@Override public Connector create(String connectorId, Map<String, String> config) { requireNonNull(config, "config is null"); try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { Bootstrap app = new Bootstrap( new NodeModule(), new MBeanModule(), new JsonModule(), new HiveClientModule(connectorId, metastore, typeManager, pageIndexerFactory), installModuleIf( SecurityConfig.class, security -> ALLOW_ALL_ACCESS_CONTROL.equalsIgnoreCase(security.getSecuritySystem()), new NoSecurityModule()), installModuleIf( SecurityConfig.class, security -> "read-only".equalsIgnoreCase(security.getSecuritySystem()), new ReadOnlySecurityModule()), installModuleIf( SecurityConfig.class, security -> "sql-standard".equalsIgnoreCase(security.getSecuritySystem()), new SqlStandardSecurityModule()), binder -> { MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer(); binder.bind(MBeanServer.class).toInstance(new RebindSafeMBeanServer(platformMBeanServer)); } ); Injector injector = app .strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(config) .setOptionalConfigurationProperties(optionalConfig) .initialize(); LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class); ConnectorMetadata metadata = injector.getInstance(ConnectorMetadata.class); ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class); ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class); ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class); HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class); HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class); ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class); return new HiveConnector( lifeCycleManager, new ClassLoaderSafeConnectorMetadata(metadata, classLoader), new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader), new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader), ImmutableSet.of(), hiveSessionProperties.getSessionProperties(), hiveTableProperties.getTableProperties(), accessControl); } catch (Exception e) { throw Throwables.propagate(e); } }
@Override public Connector create(String connectorId, Map<String, String> config) { checkNotNull(config, "config is null"); try { KafkaClientModule kafkaClientModule = new KafkaClientModule(connectorId); Bootstrap app = new Bootstrap( new NodeModule(), new JsonModule(), kafkaClientModule ); Injector injector = app.strictConfig().doNotInitializeLogging() .setRequiredConfigurationProperties(config) .quiet() .requireExplicitBindings(false) .setOptionalConfigurationProperties(optionalConfig).initialize(); KafkaClientConfig clientConfig = KafkaClientConfig.INSTANCE; KafkaPluginConfig pluginConfig = KafkaPluginConfig.INSTANCE; KafkaConnectorId kafkaConnectorId = KafkaConnectorId.INSTANCE; KafkaHiveClient hiveClient = new KafkaHiveClient(kafkaConnectorId, clientConfig, pluginConfig); KafkaMetadata kafkaMetadata = new KafkaMetadata(hiveClient, kafkaConnectorId); KafkaSplitManager kafkaSplitManager = new KafkaSplitManager(hiveClient, kafkaConnectorId, clientConfig); KafkaRecordSetProvider kafkaRecordSetProvider = new KafkaRecordSetProvider(kafkaConnectorId); KafkaHandleResolver kafkaHandleResolver = new KafkaHandleResolver(kafkaConnectorId); ConnectorMetadata connMetadata = new ClassLoaderSafeConnectorMetadata(kafkaMetadata, classLoader); ConnectorSplitManager connSplitManager = new ClassLoaderSafeConnectorSplitManager(kafkaSplitManager, classLoader); ConnectorRecordSetProvider connRecordSetProvider = new ClassLoaderSafeConnectorRecordSetProvider(kafkaRecordSetProvider, classLoader); ConnectorHandleResolver connHandleResolver = new ClassLoaderSafeConnectorHandleResolver(kafkaHandleResolver, classLoader); return new KafkaConnector(connMetadata, connSplitManager, connRecordSetProvider, connHandleResolver); // return injector.getInstance(KafkaConnector.class); // KafkaMetadata kafkaMetadata = injector.getInstance(KafkaMetadata.class); // KafkaSplitManager kafkaSplitManager = injector.getInstance(KafkaSplitManager.class); // KafkaRecordSetProvider kafkaRecordSetProvider = injector.getInstance(KafkaRecordSetProvider.class); // KafkaHandleResolver kafkaHandleResolver = injector.getInstance(KafkaHandleResolver.class); // return new KafkaConnector(kafkaMetadata, kafkaSplitManager, // kafkaRecordSetProvider, kafkaHandleResolver); // return new KafkaConnector( // new ClassLoaderSafeConnectorMetadata(kafkaMetadata, classLoader), // new ClassLoaderSafeConnectorSplitManager(kafkaSplitManager, classLoader), // new ClassLoaderSafeConnectorRecordSetProvider(kafkaRecordSetProvider, classLoader), // new ClassLoaderSafeConnectorHandleResolver(kafkaHandleResolver, classLoader)); } catch (Exception e) { e.printStackTrace(); throw Throwables.propagate(e); } }