@Inject public KafkaConnector(ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider, ConnectorHandleResolver handleResolver) { construct(metadata, splitManager, recordSetProvider, handleResolver); }
private void construct(ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider, ConnectorHandleResolver handleResolver) { this.metadata = checkNotNull(metadata, "metadata is null"); this.splitManager = checkNotNull(splitManager, "splitManager is null"); this.recordSetProvider = checkNotNull(recordSetProvider, "recordSetProvider is null"); this.handleResolver = checkNotNull(handleResolver, "handleResolver is null"); }
@Override public Connector create(final String connectorId, Map<String, String> requiredConfig) { checkNotNull(requiredConfig, "requiredConfig is null"); checkNotNull(optionalConfig, "optionalConfig is null"); try { // // A plugin is not required to use Guice; it is just very convenient // Bootstrap app = new Bootstrap(new JsonModule(), new ExampleModule(connectorId)); // // Injector injector = app.strictConfig().doNotInitializeLogging() // .setRequiredConfigurationProperties(requiredConfig) // .setOptionalConfigurationProperties(optionalConfig).initialize(); ClassToInstanceMap<Object> services = ImmutableClassToInstanceMap.builder() .put(ConnectorMetadata.class, new CloudataConnectorMetadata(connectorId, store)) .put(ConnectorSplitManager.class, new CloudataSplitManager(nodeManager, connectorId)) .put(ConnectorRecordSetProvider.class, new CloudataConnectorRecordSetProvider()) .put(ConnectorHandleResolver.class, new CloudataConnectorHandleResolver()).build(); CloudataConnector connector = new CloudataConnector(store, services); connectors.put(connectorId, connector); return connector; } catch (Exception e) { throw Throwables.propagate(e); } }
@Override public ConnectorRecordSetProvider getRecordSetProvider() { return recordSetProvider; }
@Override public ConnectorRecordSetProvider getRecordSetProvider() { return jdbcRecordSetProvider; }
public ClassLoaderSafeConnectorRecordSetProvider(ConnectorRecordSetProvider delegate, ClassLoader classLoader) { this.delegate = requireNonNull(delegate, "delegate is null"); this.classLoader = requireNonNull(classLoader, "classLoader is null"); }
@Override public Connector create(String connectorId, Map<String, String> config) { checkNotNull(config, "config is null"); try { KafkaClientModule kafkaClientModule = new KafkaClientModule(connectorId); Bootstrap app = new Bootstrap( new NodeModule(), new JsonModule(), kafkaClientModule ); Injector injector = app.strictConfig().doNotInitializeLogging() .setRequiredConfigurationProperties(config) .quiet() .requireExplicitBindings(false) .setOptionalConfigurationProperties(optionalConfig).initialize(); KafkaClientConfig clientConfig = KafkaClientConfig.INSTANCE; KafkaPluginConfig pluginConfig = KafkaPluginConfig.INSTANCE; KafkaConnectorId kafkaConnectorId = KafkaConnectorId.INSTANCE; KafkaHiveClient hiveClient = new KafkaHiveClient(kafkaConnectorId, clientConfig, pluginConfig); KafkaMetadata kafkaMetadata = new KafkaMetadata(hiveClient, kafkaConnectorId); KafkaSplitManager kafkaSplitManager = new KafkaSplitManager(hiveClient, kafkaConnectorId, clientConfig); KafkaRecordSetProvider kafkaRecordSetProvider = new KafkaRecordSetProvider(kafkaConnectorId); KafkaHandleResolver kafkaHandleResolver = new KafkaHandleResolver(kafkaConnectorId); ConnectorMetadata connMetadata = new ClassLoaderSafeConnectorMetadata(kafkaMetadata, classLoader); ConnectorSplitManager connSplitManager = new ClassLoaderSafeConnectorSplitManager(kafkaSplitManager, classLoader); ConnectorRecordSetProvider connRecordSetProvider = new ClassLoaderSafeConnectorRecordSetProvider(kafkaRecordSetProvider, classLoader); ConnectorHandleResolver connHandleResolver = new ClassLoaderSafeConnectorHandleResolver(kafkaHandleResolver, classLoader); return new KafkaConnector(connMetadata, connSplitManager, connRecordSetProvider, connHandleResolver); // return injector.getInstance(KafkaConnector.class); // KafkaMetadata kafkaMetadata = injector.getInstance(KafkaMetadata.class); // KafkaSplitManager kafkaSplitManager = injector.getInstance(KafkaSplitManager.class); // KafkaRecordSetProvider kafkaRecordSetProvider = injector.getInstance(KafkaRecordSetProvider.class); // KafkaHandleResolver kafkaHandleResolver = injector.getInstance(KafkaHandleResolver.class); // return new KafkaConnector(kafkaMetadata, kafkaSplitManager, // kafkaRecordSetProvider, kafkaHandleResolver); // return new KafkaConnector( // new ClassLoaderSafeConnectorMetadata(kafkaMetadata, classLoader), // new ClassLoaderSafeConnectorSplitManager(kafkaSplitManager, classLoader), // new ClassLoaderSafeConnectorRecordSetProvider(kafkaRecordSetProvider, classLoader), // new ClassLoaderSafeConnectorHandleResolver(kafkaHandleResolver, classLoader)); } catch (Exception e) { e.printStackTrace(); throw Throwables.propagate(e); } }