@Override public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) { requireNonNull(config, "config is null"); try { Bootstrap app = new Bootstrap( new JsonModule(), new HDFSModule(connectorId, context.getTypeManager()) ); Injector injector = app .strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(config) .initialize(); return injector.getInstance(HDFSConnector.class); } catch (Exception e) { e.printStackTrace(); } return null; }
@Override public Connector create(final String connectorId, Map<String, String> requiredConfig, ConnectorContext context) { requireNonNull(requiredConfig, "config is null"); try { Bootstrap app = new Bootstrap( binder -> binder.bind(NodeManager.class).toInstance(context.getNodeManager()), new KuduModule(connectorId)); Injector injector = app .strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(requiredConfig) .initialize(); return injector.getInstance(KuduConnector.class); } catch (Exception e) { throw Throwables.propagate(e); } }
@PreDestroy public void stop() { if (stopped.getAndSet(true)) { return; } for (Map.Entry<String, Connector> entry : connectors.entrySet()) { Connector connector = entry.getValue(); try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(connector.getClass().getClassLoader())) { connector.shutdown(); } catch (Throwable t) { log.error(t, "Error shutting down connector: %s", entry.getKey()); } } }
private synchronized void addCatalogConnector(String catalogName, String connectorId, ConnectorFactory factory, Map<String, String> properties) { Connector connector = createConnector(connectorId, factory, properties); addConnectorInternal(ConnectorType.STANDARD, catalogName, connectorId, connector); String informationSchemaId = makeInformationSchemaConnectorId(connectorId); addConnectorInternal(ConnectorType.INFORMATION_SCHEMA, catalogName, informationSchemaId, new InformationSchemaConnector(catalogName, nodeManager, metadataManager)); String systemId = makeSystemTablesConnectorId(connectorId); addConnectorInternal(ConnectorType.SYSTEM, catalogName, systemId, new SystemConnector( systemId, nodeManager, connector.getSystemTables(), transactionId -> transactionManager.getConnectorTransaction(transactionId, connectorId))); // Register session and table properties once per catalog metadataManager.getSessionPropertyManager().addConnectorSessionProperties(catalogName, connector.getSessionProperties()); metadataManager.getTablePropertyManager().addTableProperties(catalogName, connector.getTableProperties()); }
private static void registerBogusConnector(TransactionManager transactionManager, String connectorId) { transactionManager.addConnector(connectorId, new Connector() { @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) { // Just return something return new ConnectorTransactionHandle() {}; } @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) { throw new UnsupportedOperationException(); } @Override public ConnectorSplitManager getSplitManager() { throw new UnsupportedOperationException(); } }); }
private static Connector createTestingConnector(String connectorId) { return new LegacyTransactionConnector(connectorId, new com.facebook.presto.spi.Connector() { private final ConnectorMetadata metadata = new TestingMetadata(); @Override public ConnectorMetadata getMetadata() { return metadata; } @Override public ConnectorSplitManager getSplitManager() { throw new UnsupportedOperationException(); } }); }
@Override public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) { Path basePath = new Path(config.get(BASEPATH)); String spreadsheetSubDir = config.get(SUBDIR); String useFileCacheStr = config.get(USE_FILE_CACHE); String proxyUserStr = config.get(PROXY_USER); boolean proxyUser = false; if (proxyUserStr != null) { proxyUser = Boolean.parseBoolean(proxyUserStr); } boolean useFileCache = true; if (useFileCacheStr != null) { useFileCache = Boolean.parseBoolean(useFileCacheStr); } try { return new SpreadsheetConnector(UserGroupInformation.getCurrentUser(), _configuration, basePath, spreadsheetSubDir, useFileCache, proxyUser); } catch (IOException e) { throw new RuntimeException(e); } }
/** * Build a connector instance from the plug in, supplying the given properties. * * This can build a connector with the mock client which is normally done in testing. * The plug in is created first with createPluginInstance. * * @param plugin * @param properties * @param withMockClient * @return */ public static KinesisConnector createConnector(KinesisPlugin plugin, Map<String, String> properties, boolean withMockClient) { requireNonNull(plugin, "Plugin instance should not be null"); requireNonNull(properties, "Properties map should not be null (can be empty)"); if (withMockClient) { plugin.setAltProviderClass(KinesisTestClientManager.class); } ConnectorFactory factory = plugin.getConnectorFactories().iterator().next(); assertNotNull(factory); Connector connector = factory.create("kinesis", properties, new TestingConnectorContext() {}); assertTrue(connector instanceof KinesisConnector); return (KinesisConnector) connector; }
@Override public Connector create(String s, Map<String, String> config, ConnectorContext context) { NodeManager nodeManager = context.getNodeManager(); return new RestConnector(nodeManager, restFactory.create(config)); }
@Override public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) { requireNonNull(connectorId, "connectorId is null"); requireNonNull(config, "config is null"); try { Bootstrap app = new Bootstrap( // new JsonModule(), new EthereumConnectorModule(), binder -> { binder.bind(EthereumConnectorId.class).toInstance(new EthereumConnectorId(connectorId)); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); } ); Injector injector = app.strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(config) .initialize(); return injector.getInstance(EthereumConnector.class); } catch (Exception e) { throw Throwables.propagate(e); } }
public Connector create(final String connectorId, Map<String, String> requiredConfig, ConnectorContext context) { requireNonNull(requiredConfig, "requiredConfig is null"); final String locator_host = requiredConfig .getOrDefault(MonarchProperties.LOCATOR_HOST, MonarchProperties.LOCATOR_HOST_DEFAULT); final int locator_port = Integer.parseInt(requiredConfig .getOrDefault(MonarchProperties.LOCATOR_PORT, MonarchProperties.LOCATOR_PORT_DEFAULT)); // Create a client that connects to the Ampool cluster via a locator (that is already running!). final Properties props = new Properties(); props.setProperty(Constants.MClientCacheconfig.MONARCH_CLIENT_LOG, requiredConfig .getOrDefault(MonarchProperties.MONARCH_CLIENT_LOG, MonarchProperties.MONARCH_CLIENT_LOG_DEFAULT_LOCATION)); final AmpoolClient aClient = new AmpoolClient(locator_host, locator_port, props); log.info("INFORMATION: AmpoolClient created successfully."); try { Bootstrap app = new Bootstrap(new AmpoolModule(connectorId, aClient, context.getTypeManager())); Injector injector = app .doNotInitializeLogging() .setRequiredConfigurationProperties(requiredConfig) .initialize(); log.info("INFORMATION: Injector initialized successfully."); return injector.getInstance(AmpoolConnector.class); } catch (Exception e) { throw Throwables.propagate(e); } }
@Override public Connector create(String connectorId, Map<String, String> config) { requireNonNull(connectorId, "connectorId is null"); requireNonNull(config, "config is null"); try { Bootstrap app = new Bootstrap( new JsonModule(), new KafkaConnectorModule(), binder -> { binder.bind(KafkaConnectorId.class).toInstance(new KafkaConnectorId(connectorId)); binder.bind(TypeManager.class).toInstance(typeManager); binder.bind(NodeManager.class).toInstance(nodeManager); if (tableDescriptionSupplier.isPresent()) { binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KafkaTopicDescription>>>() {}).toInstance(tableDescriptionSupplier.get()); } else { binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KafkaTopicDescription>>>() {}).to(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON); } } ); Injector injector = app.strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(config) .setOptionalConfigurationProperties(optionalConfig) .initialize(); return injector.getInstance(KafkaConnector.class); } catch (Exception e) { throw Throwables.propagate(e); } }
@Test public void testSpinup() { ConnectorFactory factory = testConnectorExists(); Connector c = factory.create("test-connector", ImmutableMap.<String, String>builder() .put("kafka.table-names", "test") .put("kafka.nodes", "localhost:9092") .build()); assertNotNull(c); }
@Override public Connector create(String connectorId, Map<String, String> config) { try { Bootstrap app = new Bootstrap( new JsonModule(), new MBeanModule(), binder -> { CurrentNodeId currentNodeId = new CurrentNodeId(nodeManager.getCurrentNode().getNodeIdentifier()); MBeanServer mbeanServer = new RebindSafeMBeanServer(getPlatformMBeanServer()); binder.bind(MBeanServer.class).toInstance(mbeanServer); binder.bind(CurrentNodeId.class).toInstance(currentNodeId); binder.bind(NodeManager.class).toInstance(nodeManager); binder.bind(PageSorter.class).toInstance(pageSorter); binder.bind(BlockEncodingSerde.class).toInstance(blockEncodingSerde); binder.bind(TypeManager.class).toInstance(typeManager); }, metadataModule, new BackupModule(backupProviders), new StorageModule(connectorId), new RaptorModule(connectorId)); Injector injector = app .strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(config) .setOptionalConfigurationProperties(optionalConfig) .initialize(); return injector.getInstance(RaptorConnector.class); } catch (Exception e) { throw Throwables.propagate(e); } }
@Override public Connector create(String connectorId, Map<String, String> requiredConfig) { return new BlackHoleConnector( new BlackHoleMetadata(), new BlackHoleSplitManager(), new BlackHolePageSourceProvider(), new BlackHolePageSinkProvider() ); }
@Override public Connector create(String connectorId, Map<String, String> properties) { return new Connector() { @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) { checkConnectorSupports(READ_COMMITTED, isolationLevel); return JmxTransactionHandle.INSTANCE; } @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) { return new JmxMetadata(connectorId, mbeanServer); } @Override public ConnectorSplitManager getSplitManager() { return new JmxSplitManager(connectorId, nodeManager); } @Override public ConnectorRecordSetProvider getRecordSetProvider() { return new JmxRecordSetProvider(mbeanServer, nodeManager.getCurrentNode().getNodeIdentifier()); } }; }
public synchronized ConnectorTransactionMetadata getConnectorTransactionMetadata(String connectorId, Connector connector) { checkOpenTransaction(); ConnectorTransactionMetadata transactionMetadata = connectorIdToMetadata.get(connectorId); if (transactionMetadata == null) { transactionMetadata = new ConnectorTransactionMetadata(connector, beginTransaction(connector)); // Don't use computeIfAbsent b/c the beginTransaction call might be recursive checkState(connectorIdToMetadata.put(connectorId, transactionMetadata) == null); } return transactionMetadata; }
private ConnectorTransactionHandle beginTransaction(Connector connector) { if (connector instanceof InternalConnector) { return ((InternalConnector) connector).beginTransaction(transactionId, isolationLevel, readOnly); } else { return connector.beginTransaction(isolationLevel, readOnly); } }
private static Connector createConnector(String connectorId, ConnectorFactory factory, Map<String, String> properties) { Class<?> factoryClass = factory.getClass(); if (factory instanceof LegacyTransactionConnectorFactory) { factoryClass = ((LegacyTransactionConnectorFactory) factory).getConnectorFactory().getClass(); } try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factoryClass.getClassLoader())) { return factory.create(connectorId, properties); } }
@Test public void testTransactionWorkflow() throws Exception { try (IdleCheckExecutor executor = new IdleCheckExecutor()) { TransactionManager transactionManager = TransactionManager.create(new TransactionManagerConfig(), executor.getExecutor(), finishingExecutor); Connector c1 = new TpchConnectorFactory(new InMemoryNodeManager()).create("c1", ImmutableMap.of()); transactionManager.addConnector("c1", c1); TransactionId transactionId = transactionManager.beginTransaction(false); Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); TransactionInfo transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertFalse(transactionInfo.isAutoCommitContext()); Assert.assertTrue(transactionInfo.getConnectorIds().isEmpty()); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); ConnectorMetadata metadata = transactionManager.getMetadata(transactionId, "c1"); metadata.listSchemaNames(TEST_SESSION.toConnectorSession("c1")); transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertEquals(transactionInfo.getConnectorIds(), ImmutableList.of("c1")); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); transactionManager.asyncCommit(transactionId).join(); Assert.assertTrue(transactionManager.getAllTransactionInfos().isEmpty()); } }
@Test public void testAbortedTransactionWorkflow() throws Exception { try (IdleCheckExecutor executor = new IdleCheckExecutor()) { TransactionManager transactionManager = TransactionManager.create(new TransactionManagerConfig(), executor.getExecutor(), finishingExecutor); Connector c1 = new TpchConnectorFactory(new InMemoryNodeManager()).create("c1", ImmutableMap.of()); transactionManager.addConnector("c1", c1); TransactionId transactionId = transactionManager.beginTransaction(false); Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); TransactionInfo transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertFalse(transactionInfo.isAutoCommitContext()); Assert.assertTrue(transactionInfo.getConnectorIds().isEmpty()); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); ConnectorMetadata metadata = transactionManager.getMetadata(transactionId, "c1"); metadata.listSchemaNames(TEST_SESSION.toConnectorSession("c1")); transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertEquals(transactionInfo.getConnectorIds(), ImmutableList.of("c1")); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); transactionManager.asyncAbort(transactionId).join(); Assert.assertTrue(transactionManager.getAllTransactionInfos().isEmpty()); } }
@Override public Connector create(String connectorId, Map<String, String> properties) { int splitsPerNode = getSplitsPerNode(properties); return new Connector() { @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) { return TpchTransactionHandle.INSTANCE; } @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transaction) { return new TpchMetadata(connectorId); } @Override public ConnectorSplitManager getSplitManager() { return new TpchSplitManager(connectorId, nodeManager, splitsPerNode); } @Override public ConnectorRecordSetProvider getRecordSetProvider() { return new TpchRecordSetProvider(); } }; }
@Override public Connector create(String connectorId, Map<String, String> properties) { requireNonNull(properties, "properties is null"); int splitsPerNode = getSplitsPerNode(properties); return new Connector() { @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) { return TpchTransactionHandle.INSTANCE; } @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transaction) { return new SampledTpchMetadata(connectorId); } @Override public ConnectorSplitManager getSplitManager() { return new TpchSplitManager(connectorId, nodeManager, splitsPerNode); } @Override public ConnectorRecordSetProvider getRecordSetProvider() { return new SampledTpchRecordSetProvider(connectorId, sampleWeight); } }; }
@Override public Connector create(String connectorId, Map<String, String> config) { requireNonNull(config, "config is null"); try { Bootstrap app = new Bootstrap( new MBeanModule(), new JsonModule(), new CassandraClientModule(connectorId), new Module() { @Override public void configure(Binder binder) { MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer(); binder.bind(MBeanServer.class).toInstance(new RebindSafeMBeanServer(platformMBeanServer)); } }); Injector injector = app.strictConfig().doNotInitializeLogging() .setRequiredConfigurationProperties(config) .setOptionalConfigurationProperties(optionalConfig).initialize(); return injector.getInstance(CassandraConnector.class); } catch (Exception e) { throw Throwables.propagate(e); } }
@BeforeClass public void setup() throws Exception { EmbeddedCassandraServerHelper.startEmbeddedCassandra(); initializeTestData(DATE); String connectorId = "cassandra-test"; CassandraConnectorFactory connectorFactory = new CassandraConnectorFactory( connectorId, ImmutableMap.<String, String>of()); Connector connector = connectorFactory.create(connectorId, ImmutableMap.of( "cassandra.contact-points", HOSTNAME, "cassandra.native-protocol-port", Integer.toString(PORT))); metadata = connector.getMetadata(CassandraTransactionHandle.INSTANCE); assertInstanceOf(metadata, CassandraMetadata.class); splitManager = connector.getSplitManager(); assertInstanceOf(splitManager, CassandraSplitManager.class); recordSetProvider = connector.getRecordSetProvider(); assertInstanceOf(recordSetProvider, CassandraRecordSetProvider.class); database = KEYSPACE_NAME.toLowerCase(); table = new SchemaTableName(database, TABLE_NAME.toLowerCase()); tableUnpartitioned = new SchemaTableName(database, "presto_test_unpartitioned"); invalidTable = new SchemaTableName(database, "totally_invalid_table_name"); }
@Parameters({ "kinesis.awsAccessKey", "kinesis.awsSecretKey" }) @Test public void testSpinUp(String awsAccessKey, String awsSecretKey) { ConnectorFactory factory = testConnectorExists(); // Important: this has to be created before we setup the injector in the factory: assertNotNull(factory.getHandleResolver()); Connector c = factory.create("kinesis.test-connector", ImmutableMap.<String, String>builder() .put("kinesis.hide-internal-columns", "false") .put("kinesis.access-key", TestUtils.noneToBlank(awsAccessKey)) .put("kinesis.secret-key", TestUtils.noneToBlank(awsSecretKey)) .build(), new TestingConnectorContext() {}); assertNotNull(c); // Verify that the key objects have been created on the connector assertNotNull(c.getRecordSetProvider()); assertNotNull(c.getSplitManager()); ConnectorMetadata md = c.getMetadata(KinesisTransactionHandle.INSTANCE); assertNotNull(md); ConnectorTransactionHandle handle = c.beginTransaction(READ_COMMITTED, true); assertTrue(handle != null && handle instanceof KinesisTransactionHandle); }
@Override public Connector create(String connectorId, Map<String, String> properties) { int splitsPerNode = getSplitsPerNode(properties); TpchIndexedData indexedData = new TpchIndexedData(connectorId, indexSpec); return new Connector() { @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) { return TpchTransactionHandle.INSTANCE; } @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) { return new TpchIndexMetadata(connectorId, indexedData); } @Override public ConnectorSplitManager getSplitManager() { return new TpchSplitManager(connectorId, nodeManager, splitsPerNode); } @Override public ConnectorRecordSetProvider getRecordSetProvider() { return new TpchRecordSetProvider(); } @Override public ConnectorIndexProvider getIndexProvider() { return new TpchIndexProvider(indexedData); } @Override public Set<SystemTable> getSystemTables() { return ImmutableSet.of(new ExampleSystemTable()); } }; }
@Override public Connector create(String connectorId, Map<String, String> config) { return new LegacyTransactionConnector(connectorId, connectorFactory.create(connectorId, config)); }
public LegacyTransactionConnector(String connectorId, com.facebook.presto.spi.Connector connector) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.connector = requireNonNull(connector, "connector is null"); }
public void addConnector(String connectorId, Connector connector) { requireNonNull(connectorId, "connectorId is null"); requireNonNull(connector, "connector is null"); checkArgument(connectorsById.put(connectorId, connector) == null, "Connector '%s' is already registered", connectorId); }
public ConnectorMetadata getMetadata(TransactionId transactionId, String connectorId) { TransactionMetadata transactionMetadata = getTransactionMetadata(transactionId); Connector connector = getConnector(connectorId); return transactionMetadata.getConnectorTransactionMetadata(connectorId, connector).getConnectorMetadata(); }
public ConnectorTransactionHandle getConnectorTransaction(TransactionId transactionId, String connectorId) { TransactionMetadata transactionMetadata = getTransactionMetadata(transactionId); Connector connector = getConnector(connectorId); return transactionMetadata.getConnectorTransactionMetadata(connectorId, connector).getTransactionHandle(); }
private Connector getConnector(String connectorId) { Connector connector = connectorsById.get(connectorId); checkArgument(connector != null, "Unknown connector ID: %s", connectorId); return connector; }
public ConnectorTransactionMetadata(Connector connector, ConnectorTransactionHandle transactionHandle) { this.connector = requireNonNull(connector, "connector is null"); this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null"); this.connectorMetadataSupplier = Suppliers.memoize(() -> connector.getMetadata(transactionHandle)); }
@Override public Connector create(String connectorId, Map<String, String> config) { return new GlobalSystemConnector(connectorId, tables, procedures); }
@Test public void testFailedTransactionWorkflow() throws Exception { try (IdleCheckExecutor executor = new IdleCheckExecutor()) { TransactionManager transactionManager = TransactionManager.create(new TransactionManagerConfig(), executor.getExecutor(), finishingExecutor); Connector c1 = new TpchConnectorFactory(new InMemoryNodeManager()).create("c1", ImmutableMap.of()); transactionManager.addConnector("c1", c1); TransactionId transactionId = transactionManager.beginTransaction(false); Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); TransactionInfo transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertFalse(transactionInfo.isAutoCommitContext()); Assert.assertTrue(transactionInfo.getConnectorIds().isEmpty()); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); ConnectorMetadata metadata = transactionManager.getMetadata(transactionId, "c1"); metadata.listSchemaNames(TEST_SESSION.toConnectorSession("c1")); transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertEquals(transactionInfo.getConnectorIds(), ImmutableList.of("c1")); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); transactionManager.fail(transactionId); Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); try { transactionManager.getMetadata(transactionId, "c1"); Assert.fail(); } catch (PrestoException e) { Assert.assertEquals(e.getErrorCode(), TRANSACTION_ALREADY_ABORTED.toErrorCode()); } Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); transactionManager.asyncAbort(transactionId).join(); Assert.assertTrue(transactionManager.getAllTransactionInfos().isEmpty()); } }
@Override public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) { log.info("In connector factory create method. Connector id: " + connectorId); requireNonNull(connectorId, "connectorId is null"); requireNonNull(config, "config is null"); try { Bootstrap app = new Bootstrap( new JsonModule(), new KinesisConnectorModule(), binder -> { binder.bindConstant().annotatedWith(Names.named("connectorId")).to(connectorId); binder.bind(ConnectorId.class).toInstance(new ConnectorId(connectorId)); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); // Note: moved creation from KinesisConnectorModule because connector manager accesses it earlier! binder.bind(KinesisHandleResolver.class).toInstance(new KinesisHandleResolver(connectorName)); // Moved creation here from KinesisConnectorModule to make it easier to parameterize if (altProviderClass.isPresent()) { binder.bind(KinesisClientProvider.class).to(altProviderClass.get()).in(Scopes.SINGLETON); } else { binder.bind(KinesisClientProvider.class).to(KinesisClientManager.class).in(Scopes.SINGLETON); } if (tableDescriptionSupplier.isPresent()) { binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KinesisStreamDescription>>>() {}).toInstance(tableDescriptionSupplier.get()); } else { binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KinesisStreamDescription>>>() {}).to(KinesisTableDescriptionSupplier.class).in(Scopes.SINGLETON); } } ); this.injector = app.strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(config) .setOptionalConfigurationProperties(optionalConfig) .initialize(); KinesisConnector connector = this.injector.getInstance(KinesisConnector.class); // Register objects for shutdown, at the moment only KinesisTableDescriptionSupplier if (!tableDescriptionSupplier.isPresent()) { // This will shutdown related dependent objects as well: KinesisTableDescriptionSupplier supp = getTableDescSupplier(this.injector); connector.registerShutdownObject(supp); } log.info("Done with injector. Returning the connector itself."); return connector; } catch (Exception e) { throw Throwables.propagate(e); } }