@Test public void testOldProperties() { KafkaSink kafkaSink = new KafkaSink(); Context context = new Context(); context.put("topic", "test-topic"); context.put(OLD_BATCH_SIZE, "300"); context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092"); context.put(REQUIRED_ACKS_FLUME_KEY, "all"); Configurables.configure(kafkaSink, context); Properties kafkaProps = kafkaSink.getKafkaProps(); assertEquals(kafkaSink.getTopic(), "test-topic"); assertEquals(kafkaSink.getBatchSize(), 300); assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "localhost:9092,localhost:9092"); assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all"); }
/** * Set default parameters and their values * * @return */ @Override public Arguments getDefaultParameters() { Arguments defaultParameters = new Arguments(); defaultParameters.addArgument(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ProducerKeys.BOOTSTRAP_SERVERS_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerKeys.ZOOKEEPER_SERVERS, ProducerKeys.ZOOKEEPER_SERVERS_DEFAULT); defaultParameters.addArgument(ProducerKeys.KAFKA_TOPIC_CONFIG, ProducerKeys.KAFKA_TOPIC_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeys.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerKeys.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.COMPRESSION_TYPE_CONFIG, ProducerKeys.COMPRESSION_TYPE_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.BATCH_SIZE_CONFIG, ProducerKeys.BATCH_SIZE_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.LINGER_MS_CONFIG, ProducerKeys.LINGER_MS_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.BUFFER_MEMORY_CONFIG, ProducerKeys.BUFFER_MEMORY_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.ACKS_CONFIG, ProducerKeys.ACKS_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.SEND_BUFFER_CONFIG, ProducerKeys.SEND_BUFFER_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerConfig.RECEIVE_BUFFER_CONFIG, ProducerKeys.RECEIVE_BUFFER_CONFIG_DEFAULT); defaultParameters.addArgument(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); defaultParameters.addArgument(PropsKeys.MESSAGE_PLACEHOLDER_KEY, PropsKeys.MSG_PLACEHOLDER); defaultParameters.addArgument(ProducerKeys.KERBEROS_ENABLED, ProducerKeys.KERBEROS_ENABLED_DEFULAT); defaultParameters.addArgument(ProducerKeys.JAVA_SEC_AUTH_LOGIN_CONFIG, ProducerKeys.JAVA_SEC_AUTH_LOGIN_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerKeys.JAVA_SEC_KRB5_CONFIG, ProducerKeys.JAVA_SEC_KRB5_CONFIG_DEFAULT); defaultParameters.addArgument(ProducerKeys.SASL_KERBEROS_SERVICE_NAME, ProducerKeys.SASL_KERBEROS_SERVICE_NAME_DEFAULT); defaultParameters.addArgument(ProducerKeys.SASL_MECHANISM, ProducerKeys.SASL_MECHANISM_DEFAULT); return defaultParameters; }
public void createProducer(String bootstrapServer) { long numberOfEvents = 5; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>( props); for (int i = 0; i < numberOfEvents; i++) { String key = "testContainers"; String value = "AreAwesome"; ProducerRecord<String, String> record = new ProducerRecord<>( "hello_world_topic", key, value); try { producer.send(record).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.printf("key = %s, value = %s\n", key, value); } producer.close(); }
public KmqClient(KmqConfig config, KafkaClients clients, Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer, long msgPollTimeout) { this.config = config; this.msgPollTimeout = msgPollTimeout; this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer); // Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition. this.markerProducer = clients.createProducer( MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class, Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class)); LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId())); msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic())); }
@Override public Properties overridingProps() { Properties props = new Properties(); int port = findLocalPort(); // We need to convert all the properties to the Cruise Control properties. setSecurityConfigs(props, "producer"); for (String configName : ProducerConfig.configNames()) { Object value = props.get(configName); if (value != null) { props.remove(configName); props.put(appendPrefix(configName), value); } } props.setProperty("metric.reporters", CruiseControlMetricsReporter.class.getName()); props.setProperty("listeners", "SSL://127.0.0.1:" + port); props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + port); props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), "SSL"); props.setProperty(CRUISE_CONTROL_METRICS_REPORTING_INTERVAL_MS_CONFIG, "100"); props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC); return props; }
@Before public void setUp() { super.setUp(); Properties props = new Properties(); props.setProperty(ProducerConfig.ACKS_CONFIG, "-1"); AtomicInteger failed = new AtomicInteger(0); try (Producer<String, String> producer = createProducer(props)) { for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("TestTopic", Integer.toString(i)), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { failed.incrementAndGet(); } } }); } } assertEquals(0, failed.get()); }
public void initialize(String servers) { if (isInitialized.get()) { logger.warn("Already initialized"); return; } Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SixtPartitioner.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, "3"); props.put(ProducerConfig.ACKS_CONFIG, "all"); properties.forEach(props::put); realProducer = new KafkaProducer<>(props); isInitialized.set(true); }
public void produce(UUID key, Object value) { ConfigurationService configService = ServiceLocator .findService(ConfigurationService.class); Properties kafkaProps = new Properties(); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configService.getVOConfigurationSetting( ConfigurationKey.KAFKA_BOOTSTRAP_SERVERS, "global") .getValue()); this.producer = new KafkaProducer<>(kafkaProps, new UUIDSerializer(), new DataSerializer(value.getClass())); try { producer.send(new ProducerRecord<>(TOPIC, key, value)); } catch (Exception e) { LOGGER.error("Producer closed"); e.printStackTrace(); } finally { producer.close(); LOGGER.debug("Producer closed"); } }
public void publishDummyData() { final String topic = "TestTopic"; // Create publisher final Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final KafkaProducer<String, String> producer = new KafkaProducer<>(config); for (int charCode = 65; charCode < 91; charCode++) { final char[] key = new char[1]; key[0] = (char) charCode; producer.send(new ProducerRecord<>(topic, new String(key), new String(key))); } producer.flush(); producer.close(); }
public void publishDummyDataNumbers() { final String topic = "NumbersTopic"; // Create publisher final Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(config); for (int value = 0; value < 10000; value++) { producer.send(new ProducerRecord<>(topic, value, value)); } producer.flush(); producer.close(); }
public static void main(String[] args) throws ExecutionException, InterruptedException { Map props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections // to the Kakfa cluster props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-local:9092"); // props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, // JsonSerializer.class); // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, // JsonSerializer.class); // value to block, after which it will throw a TimeoutException props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000); AdminClient adminClient = AdminClient.create(props); adminClient.describeCluster(); Collection<TopicListing> topicListings = adminClient.listTopics().listings().get(); System.out.println(topicListings); }
public Producer() { LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName()); Properties kafkaProps = new Properties(); String defaultClusterValue = "localhost:9092"; String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue); LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0"); this.kafkaProducer = new KafkaProducer<>(kafkaProps); }
public KafkaLogbackAppender(final Properties producerConfig, final String topic) { this.topic = topic; // Build properties that can be used by the kafka producer this.producerConfig = new Properties(); this.producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); this.producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); this.producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); this.producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, 1); this.producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName()); this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName()); this.producerConfig.putAll(producerConfig); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Long, byte[]> producer = new KafkaProducer<>(properties); LongStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number, //key String.format("record-%s", number.toString()).getBytes())) //value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<String, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number.toString(), //key UserAvroSerdes.serialize(new User(String.format("user-%s", number.toString()))))) //value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream .rangeClosed(1, 100000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Integer, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key KafkaProducerUtil.createMessage(1000))) //Value .forEach(record -> { producer.send(record); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Integer, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key KafkaProducerUtil.createMessage(1000))) //Value .forEach(record -> { producer.send(record); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(record); }); producer.close(); }
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); }
@Test public void testStartStop() throws Exception { expectConfigure(); expectStart(Collections.EMPTY_LIST); expectStop(); PowerMock.replayAll(); store.configure(DEFAULT_DISTRIBUTED_CONFIG); assertEquals(TOPIC, capturedTopic.getValue()); assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); assertEquals(TOPIC, capturedNewTopic.getValue().name()); assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions()); assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor()); store.start(); store.stop(); PowerMock.verifyAll(); }
@Test public void testStartStop() throws Exception { expectConfigure(); expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); expectStop(); PowerMock.replayAll(); configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); assertEquals(TOPIC, capturedTopic.getValue()); assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); assertEquals(TOPIC, capturedNewTopic.getValue().name()); assertEquals(1, capturedNewTopic.getValue().numPartitions()); assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor()); configStorage.start(); configStorage.stop(); PowerMock.verifyAll(); }
/** * Get the configs for the {@link KafkaProducer producer}. * Properties using the prefix {@link #PRODUCER_PREFIX} will be used in favor over their non-prefixed versions * except in the case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed * version as we only support reading/writing from/to the same Kafka Cluster. * * @param clientId clientId * @return Map of the producer configuration. */ public Map<String, Object> getProducerConfigs(final String clientId) { final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); if (eosEnabled) { if (clientProvidedProps.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) { throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have idempotency enabled."); } if (clientProvidedProps.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have only one in-flight request per connection."); } } // generate producer configs from original properties and overridden maps final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES); props.putAll(clientProvidedProps); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); return props; }
@BeforeClass public static void setupConfigsAndUtils() throws Exception { PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); }
@Override public void run() { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) { while (getCurrIteration() < numIterations && !shutdown) { for (final String value : inputValues) { producer.send(new ProducerRecord<String, String>(topic, value)); } incrementInteration(); } } }
private Properties setProduceConsumeProperties(final String clientId) { Properties props = new Properties(); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); return props; }
@Inject public JkesKafkaProducer(JkesProperties jkesProperties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, jkesProperties.getKafkaBootstrapServers()); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("documentBasePackage", jkesProperties.getDocumentBasePackage()); // Why use StringSerializer? Because in some cases key is String not Long, such as MongoDB props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JkesKafkaJsonSerializer.class); this.producer = new KafkaProducer<>(props); }
@Autowired public KafkaRequestResponseProducer(@Value("${seldon.kafka.enable}") boolean kafkaEnabled) { if (kafkaEnabled) { enabled = true; String kafkaHostPort = System.getenv(ENV_VAR_SELDON_KAFKA_SERVER); logger.info(String.format("using %s[%s]", ENV_VAR_SELDON_KAFKA_SERVER, kafkaHostPort)); if (kafkaHostPort == null) { logger.warn("*WARNING* SELDON_KAFKA_SERVER environment variable not set!"); kafkaHostPort = "localhost:9093"; } logger.info("Starting kafka client with server "+kafkaHostPort); Properties props = new Properties(); props.put("bootstrap.servers", kafkaHostPort); props.put("client.id", "RequestResponseProducer"); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000"); props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000"); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"20"); //NB need to investigate issues of Kafka not able to get metadata producer = new KafkaProducer<>(props, new StringSerializer(), new RequestResponseSerializer()); } else logger.warn("Kafka not enabled"); }
@Override public void configure(Map<String, ?> config) { _partitionMetricSampleStoreTopic = (String) config.get(PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG); _brokerMetricSampleStoreTopic = (String) config.get(BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG); if (_partitionMetricSampleStoreTopic == null || _brokerMetricSampleStoreTopic == null || _partitionMetricSampleStoreTopic.isEmpty() || _brokerMetricSampleStoreTopic.isEmpty()) { throw new IllegalArgumentException("The sample store topic names must be configured."); } String numProcessingThreadsString = (String) config.get(NUM_SAMPLE_LOADING_THREADS); int numProcessingThreads = numProcessingThreadsString == null || numProcessingThreadsString.isEmpty() ? 8 : Integer.parseInt(numProcessingThreadsString); _metricProcessorExecutor = Executors.newFixedThreadPool(numProcessingThreads); _consumers = new ArrayList<>(numProcessingThreads); for (int i = 0; i < numProcessingThreads; i++) { _consumers.add(createConsumers(config)); } Properties producerProps = new Properties(); producerProps.putAll(config); producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, (String) config.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG)); producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID); // Set batch.size and linger.ms to a big number to have better batching. producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000"); producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "800000"); producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "5"); producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); _producer = new KafkaProducer<>(producerProps); _loadingProgress = -1.0; ensureTopicCreated(config); }
@Override public void configure(Map<String, ?> configs) { Properties producerProps = CruiseControlMetricsReporterConfig.parseProducerConfigs(configs); if (!producerProps.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { String port = (String) configs.get("port"); String bootstrapServers = "localhost:" + (port == null ? "9092" : port); producerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); LOG.info("Using default value of {} for {}", bootstrapServers, CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); } if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) { String securityProtocol = "PLAINTEXT"; producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); LOG.info("Using default value of {} for {}", securityProtocol, CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false); setIfAbsent(producerProps, ProducerConfig.CLIENT_ID_CONFIG, reporterConfig.getString(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.CLIENT_ID_CONFIG))); // Set batch.size and linger.ms to a big number to have better batching. setIfAbsent(producerProps, ProducerConfig.LINGER_MS_CONFIG, "30000"); setIfAbsent(producerProps, ProducerConfig.BATCH_SIZE_CONFIG, "800000"); setIfAbsent(producerProps, ProducerConfig.RETRIES_CONFIG, "5"); setIfAbsent(producerProps, ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); setIfAbsent(producerProps, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); setIfAbsent(producerProps, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MetricSerde.class.getName()); setIfAbsent(producerProps, ProducerConfig.ACKS_CONFIG, "all"); _producer = new KafkaProducer<>(producerProps); _brokerId = Integer.parseInt((String) configs.get(KafkaConfig.BrokerIdProp())); _cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG); _reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTING_INTERVAL_MS_CONFIG); }
@Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; }
private static KafkaProducer<byte[], byte[]> getProducer(String brokerList) { Properties prop = new Properties(); prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); return new KafkaProducer<>(prop); }
/** * Kafka producer config bean. * This {@link Map} is used by {@link MessageProducerConfig#producerFactory}. * * @return kafka properties bean */ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.LINGER_MS_CONFIG, 10); props.put(ProducerConfig.ACKS_CONFIG, "all"); return props; }
private Properties makeKafkaProperties() { Properties kafka = new Properties(); kafka.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafka.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafka.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaHosts()); kafka.setProperty(ConsumerConfig.GROUP_ID_CONFIG, getTopologyName()); kafka.setProperty("request.required.acks", "1"); return kafka; }
protected static Properties kafkaProperties() throws ConfigurationException, CmdLineException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, makeUnboundConfig().getKafkaHosts()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("request.required.acks", "1"); return properties; }
@Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class); return props; }