@Test public void connectViaConnectionString() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra("cassandra://localhost/beers") .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void connectViaConnectionStringSupplier() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilderProvider) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra("cassandra://localhost/beers", () -> unit.get(Cluster.Builder.class)) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void testCassandraProtocolVersion() { TestOutputOperator outputOperator = setupForOutputOperatorTest(); outputOperator.getStore().setProtocolVersion("v2"); outputOperator.setup(context); Configuration config = outputOperator.getStore().getCluster().getConfiguration(); Assert.assertEquals("Procotol version was not set to V2.", ProtocolVersion.V2, config.getProtocolOptions().getProtocolVersion()); }
private Gauge<Integer> createMaxLoad(String hostname) { return () -> { Session.State state = session.getState(); return getHost(state, hostname).map((host) -> { Configuration configuration = session.getCluster().getConfiguration(); PoolingOptions poolingOptions = configuration.getPoolingOptions(); HostDistance distance = configuration.getPolicies().getLoadBalancingPolicy().distance(host); int connections = state.getOpenConnections(host); return connections * poolingOptions.getMaxRequestsPerConnection(distance); }).orElse(0); }; }
@Test public void connectViaProperty() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(unit -> { Config conf = unit.get(Config.class); expect(conf.getString("db")).andReturn("cassandra://localhost/beers"); }) .expect(serviceKey(new Env.ServiceKey())) .expect(clusterBuilder) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra() .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void connectViaPropertySupplier() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(unit -> { Config conf = unit.get(Config.class); expect(conf.getString("db")).andReturn("cassandra://localhost/beers"); }) .expect(serviceKey(new Env.ServiceKey())) .expect(clusterBuilderProvider) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra(() -> unit.get(Cluster.Builder.class)) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void onStop() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .expect(unit -> { Session session = unit.get(Session.class); session.close(); Cluster cluster = unit.get(Cluster.class); cluster.close(); }) .run(unit -> { new Cassandra("cassandra://localhost/beers") .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }, unit -> { unit.captured(Throwing.Runnable.class).iterator().next().run(); }); }
@Test public void onStopSessionerr() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .expect(unit -> { Session session = unit.get(Session.class); session.close(); expectLastCall().andThrow(new IllegalStateException("intentional err")); Cluster cluster = unit.get(Cluster.class); cluster.close(); }) .run(unit -> { new Cassandra("cassandra://localhost/beers") .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }, unit -> { unit.captured(Throwing.Runnable.class).iterator().next().run(); }); }
@SuppressWarnings("unchecked") @Test public void withAccessor() throws Exception { Object value = new Object(); new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .expect(unit -> { MappingManager manager = unit.get(MappingManager.class); expect(manager.createAccessor(Object.class)).andReturn(value); AnnotatedBindingBuilder<Object> abb = unit.mock(AnnotatedBindingBuilder.class); abb.toInstance(value); Binder binder = unit.get(Binder.class); expect(binder.bind(Object.class)).andReturn(abb); }) .run(unit -> { new Cassandra("cassandra://localhost/beers") .accesor(Object.class) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void doWithCluster() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class, StateListener.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .expect(unit -> { Cluster cluster = unit.get(Cluster.class); expect(cluster.register(unit.get(StateListener.class))).andReturn(cluster); }) .run(unit -> { new Cassandra("cassandra://localhost/beers") .doWithCluster(c -> c.register(unit.get(StateListener.class))) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void doWithClusterBuilder() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(unit -> { Builder builder = unit.get(Cluster.Builder.class); expect(builder.withClusterName("mycluster")).andReturn(builder); }) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra("cassandra://localhost/beers") .doWithClusterBuilder(b -> { b.withClusterName("mycluster"); }) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void shouldCreateClusterWithConfig() throws Exception { CassandraServiceInfo info = new CassandraServiceInfo("local", Collections.singletonList("127.0.0.1"), 9142); CassandraClusterConfig config = new CassandraClusterConfig(); config.setCompression(ProtocolOptions.Compression.NONE); config.setPoolingOptions(new PoolingOptions().setPoolTimeoutMillis(1234)); config.setQueryOptions(new QueryOptions()); config.setProtocolVersion(ProtocolVersion.NEWEST_SUPPORTED); config.setLoadBalancingPolicy(new RoundRobinPolicy()); config.setReconnectionPolicy(new ConstantReconnectionPolicy(1)); config.setRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); config.setSocketOptions(new SocketOptions()); Cluster cluster = creator.create(info, config); Configuration configuration = cluster.getConfiguration(); assertThat(configuration.getProtocolOptions().getCompression(), is(config.getCompression())); assertThat(configuration.getQueryOptions(), is(config.getQueryOptions())); assertThat(configuration.getSocketOptions(), is(config.getSocketOptions())); Policies policies = configuration.getPolicies(); assertThat(policies.getLoadBalancingPolicy(), is(config.getLoadBalancingPolicy())); assertThat(policies.getReconnectionPolicy(), is(config.getReconnectionPolicy())); assertThat(policies.getRetryPolicy(), is(config.getRetryPolicy())); }
public TestCluster(String name, List<InetSocketAddress> contactPoints, Configuration configuration) { super(name, contactPoints, configuration); }
public Configuration getConfiguration() { return configuration; }
public void setConfiguration(Configuration configuration) { this.configuration = configuration; }
@SuppressWarnings({"rawtypes", "unchecked"}) @Override public void configure(final Env env, final Config conf, final Binder binder) { ConnectionString cstr = Try.apply(() -> ConnectionString.parse(db)) .orElseGet(() -> ConnectionString.parse(conf.getString(db))); ServiceKey serviceKey = env.serviceKey(); Throwing.Function3<Class, String, Object, Void> bind = (type, name, value) -> { serviceKey.generate(type, name, k -> { binder.bind(k).toInstance(value); }); return null; }; Cluster.Builder builder = this.builder.get() .addContactPoints(cstr.contactPoints()) .withPort(cstr.port()); // allow user configure cluster builder if (ccbuilder != null) { ccbuilder.accept(builder, conf); } log.debug("Starting {}", cstr); Cluster cluster = builder.build(); // allow user configure cluster if (cc != null) { cc.accept(cluster, conf); } /** codecs */ Configuration configuration = cluster.getConfiguration(); CodecRegistry codecRegistry = configuration.getCodecRegistry(); // java 8 codecs codecRegistry.register( InstantCodec.instance, LocalDateCodec.instance, LocalTimeCodec.instance); hierarchy(cluster.getClass(), type -> bind.apply(type, cstr.keyspace(), cluster)); /** Session + Mapper */ Session session = cluster.connect(cstr.keyspace()); hierarchy(session.getClass(), type -> bind.apply(type, cstr.keyspace(), session)); MappingManager manager = new MappingManager(session); bind.apply(MappingManager.class, cstr.keyspace(), manager); bind.apply(Datastore.class, cstr.keyspace(), new Datastore(manager)); /** accessors */ accesors.forEach(c -> { Object accessor = manager.createAccessor(c); binder.bind(c).toInstance(accessor); }); env.router() .map(new CassandraMapper()); env.onStop(() -> { log.debug("Stopping {}", cstr); Try.run(session::close) .onFailure(x -> log.error("session.close() resulted in exception", x)); cluster.close(); log.info("Stopped {}", cstr); }); }
@Test public void shouldCreateCluster() throws Exception { CassandraServiceInfo info = new CassandraServiceInfo("local", Collections.singletonList("127.0.0.1"), 9142); Cluster cluster = creator.create(info, null); Configuration configuration = cluster.getConfiguration(); assertThat(configuration.getProtocolOptions().getAuthProvider(), is(AuthProvider.NONE)); }
@Test public void shouldCreateClusterWithAuthentication() throws Exception { CassandraServiceInfo info = new CassandraServiceInfo("local", Collections.singletonList("127.0.0.1"), 9142, "walter", "white"); Cluster cluster = creator.create(info, null); Configuration configuration = cluster.getConfiguration(); assertThat(configuration.getProtocolOptions().getAuthProvider(), is(instanceOf(PlainTextAuthProvider.class))); }