@Override public void configure() throws Exception { // lets shutdown quicker getContext().getShutdownStrategy().setTimeout(10); // configure the kafka component to use the broker KafkaComponent kafka = new KafkaComponent(); // you can specify more brokers separated by comma kafka.setBrokers("localhost:9092"); // add component to CamelContext getContext().addComponent("kafka", kafka); // use a timer to trigger every 100 milli seconds and generate a random word // which is sent to kafka from("timer:foo?period=100") .bean(new WordBean()) .to("kafka:words") .to("log:words?groupInterval=1000"); }
@Test public void testCustomKafkaSerializer() throws Exception { String serializer = "&serializerClass=" + SimpleKafkaSerializer.class.getName(); String epuri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1" + serializer; CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to(epuri); } }); KafkaComponent kafka = new KafkaComponent(); kafka.setBrokers("localhost:" + KAFKA_PORT); camelctx.addComponent("kafka", kafka); camelctx.start(); try { Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus()); } finally { camelctx.stop(); } }
@Test public void testCustomKafkaPartitioner() throws Exception { String partitioner = "&partitioner=" + SimpleKafkaPartitioner.class.getName(); String epuri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1" + partitioner; CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to(epuri); } }); KafkaComponent kafka = new KafkaComponent(); kafka.setBrokers("localhost:" + KAFKA_PORT); camelctx.addComponent("kafka", kafka); camelctx.start(); try { Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus()); } finally { camelctx.stop(); } }
@Bean @ConditionalOnClass(CamelContext.class) @ConditionalOnMissingBean(KafkaComponent.class) public KafkaComponent configureKafkaComponent(CamelContext camelContext, KafkaComponentConfiguration configuration) throws Exception { KafkaComponent component = new KafkaComponent(); component.setCamelContext(camelContext); Map<String, Object> parameters = new HashMap<>(); IntrospectionSupport.getProperties(configuration, parameters, null, false); IntrospectionSupport.setProperties(camelContext, camelContext.getTypeConverter(), component, parameters); return component; }
@Test public void producedStringMessageIsReceivedByKafka() throws Exception { String epuri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1"; CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to(epuri); } }); KafkaComponent kafka = new KafkaComponent(); kafka.setBrokers("localhost:" + KAFKA_PORT); camelctx.addComponent("kafka", kafka); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); sendMessagesInRoute(10, template, "IT test message", KafkaConstants.PARTITION_KEY, "1"); sendMessagesInRoute(5, template, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER); CountDownLatch latch = new CountDownLatch(15); boolean allReceived; try (KafkaConsumer<String, String> consumer = createKafkaConsumer()) { consumeKafkaMessages(consumer, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, latch); allReceived = latch.await(2, TimeUnit.SECONDS); } Assert.assertTrue("Messages published to the kafka topics were received: " + latch.getCount(), allReceived); } finally { camelctx.stop(); } }