public static void main(String[] args) throws InterruptedException, IOException { UncaughtExceptionHandling.setup(); KafkaProducer<ByteBuffer, ByteBuffer> msgProducer = KAFKA_CLIENTS .createProducer(ByteBufferSerializer.class, ByteBufferSerializer.class); LOG.info("Sending ..."); for(int i = 0; i < TOTAL_MSGS; i++) { ByteBuffer data = ByteBuffer.allocate(4).putInt(i); msgProducer.send(new ProducerRecord<>(KMQ_CONFIG.getMsgTopic(), data)); try { Thread.sleep(100L); } catch (InterruptedException e) { throw new RuntimeException(e); } LOG.info(String.format("Sent message %d", i)); } msgProducer.close(); LOG.info("Sent"); }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ACKS_CONFIG, "all"); props.put(RETRIES_CONFIG, 0); props.put(BATCH_SIZE_CONFIG, 32000); props.put(LINGER_MS_CONFIG, 100); props.put(BUFFER_MEMORY_CONFIG, 33554432); props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer"); Producer<Long, Long> producer = new KafkaProducer<>(props); long t1 = System.currentTimeMillis(); long i = 0; for(; i < 1000000; i++) { producer.send(new ProducerRecord<>("produktion", i, i)); } producer.send(new ProducerRecord<Long,Long>("produktion", (long) -1, (long)-1)); System.out.println("fertig " + i + " Nachrichten in " + (System.currentTimeMillis() - t1 + " ms")); producer.close(); }
private RealTimeTradeProducer(int index, String broker, String topic, int tradesPerSecond, int keysFrom, int keysTo) throws IOException, URISyntaxException { if (tradesPerSecond <= 0) { throw new RuntimeException("tradesPerSecond=" + tradesPerSecond); } this.index = index; this.topic = topic; this.tradesPerSecond = tradesPerSecond; tickers = new String[keysTo - keysFrom]; Arrays.setAll(tickers, i -> "T-" + Integer.toString(i + keysFrom)); Properties props = new Properties(); props.setProperty("bootstrap.servers", broker); props.setProperty("key.serializer", LongSerializer.class.getName()); props.setProperty("value.serializer", TradeSerializer.class.getName()); this.producer = new KafkaProducer<>(props); }
@OnScheduled public void onScheduled(final ProcessContext context) { try { topic = context.getProperty(TOPIC).getValue(); brokerIP = context.getProperty(BROKERIP).getValue(); props = new Properties(); props.put("bootstrap.servers", brokerIP); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) throws Exception { CommandLine commandLine = parseCommandLine(args); String zkUrl = commandLine.getOptionValue(ZOOKEEPER); String topic = commandLine.getOptionValue(TOPIC); int numMessages = Integer.parseInt(commandLine.getOptionValue(NUM_MESSAGES)); Random random = new Random(); Properties props = OperatorUtil.createKafkaProducerProperties(zkUrl); KafkaProducer kafkaProducer = new KafkaProducer<>(props); byte[] key = new byte[16]; byte[] data = new byte[1024]; for (int i = 0; i < numMessages; i++) { for (int j = 0; j < data.length; j++) { data[j] = (byte)random.nextInt(); } ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord( topic, 0, System.currentTimeMillis(), key, data); Future<RecordMetadata> future = kafkaProducer.send(producerRecord); future.get(); if (i % 100 == 0) { System.out.println("Have wrote " + i + " messages to kafka"); } } }
private void doTestNullKeyNoHeader() throws Exception { final KafkaChannel channel = startChannel(false); Properties props = channel.getProducerProps(); KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); for (int i = 0; i < 50; i++) { ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes()); producer.send(data).get(); } ExecutorCompletionService<Void> submitterSvc = new ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); List<Event> events = pullEvents(channel, submitterSvc, 50, false, false); wait(submitterSvc, 5); List<String> finals = new ArrayList<String>(50); for (int i = 0; i < 50; i++) { finals.add(i, events.get(i).getHeaders().get(KEY_HEADER)); } for (int i = 0; i < 50; i++) { Assert.assertTrue( finals.get(i) == null); } channel.stop(); }
@Test public void test_create_tracing_serializer() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092");//该地址是集群的子集,用来探测集群。 props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化 props.put("retries", 3);// 请求失败重试的次数 props.put("batch.size", 16384);// batch的大小 props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据 props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<>("test", "hello", "kafka - " + i)); Thread.sleep(10000); } }
static Producer<String, String> getProducer(KafkaOutputConfiguration configuration) { if (producer == null) { synchronized (KafkaOutput.class) { if (producer != null) { return producer; } Properties props = new Properties(); props.put("bootstrap.servers", configuration.getHost() + ":" + configuration.getPort()); props.put("acks", "all"); props.put("retries", 0); props.put("request.required.acks", "0"); props.put("batch.size", 64); props.put("linger.ms", 1); props.put("buffer.memory", 1024); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } return producer; } else { return producer; } }
@Override public void send(Long k, byte[] v) { KafkaProducer<Long, byte[]> p = getWorker(); p.initTransactions(); p.beginTransaction(); Future<RecordMetadata> res = worker.send(new ProducerRecord<Long, byte[]>(topic, k, v)); RecordMetadata record; try { record = res.get(); offsets.clear(); offsets.put(new TopicPartition(topic, record.partition()), new OffsetAndMetadata(record.offset())); p.sendOffsetsToTransaction(offsets, MallConstants.ORDER_GROUP); p.commitTransaction(); } catch (InterruptedException | ExecutionException e) { p.abortTransaction(); } }
public void sendData(String data) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); Map<MetricName, ? extends Metric> metrics = producer.metrics(); System.out.println(metrics); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("video_view", data)); producer.close(); }
@Test public void shouldWriteThenRead() throws Exception { //Create a consumer ConsumerIterator<String, String> it = buildConsumer(Original.topic); //Create a producer producer = new KafkaProducer<>(producerProps()); //send a message producer.send(new ProducerRecord<>(Original.topic, "message")).get(); //read it back MessageAndMetadata<String, String> messageAndMetadata = it.next(); String value = messageAndMetadata.message(); assertThat(value, is("message")); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Long, byte[]> producer = new KafkaProducer<>(properties); LongStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number, //key String.format("record-%s", number.toString()).getBytes())) //value .forEach(record -> producer.send(record)); producer.close(); }
public void start() throws InterruptedException { RandomGenerator random = RandomManager.getRandom(); Properties props = ConfigUtils.keyValueToProperties( "bootstrap.servers", "localhost:" + kafkaPort, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "value.serializer", "org.apache.kafka.common.serialization.StringSerializer", "compression.type", "gzip", "batch.size", 0, "acks", 1, "max.request.size", 1 << 26 // TODO ); try (Producer<String,String> producer = new KafkaProducer<>(props)) { for (int i = 0; i < howMany; i++) { Pair<String,String> datum = datumGenerator.generate(i, random); ProducerRecord<String,String> record = new ProducerRecord<>(topic, datum.getFirst(), datum.getSecond()); producer.send(record); log.debug("Sent datum {} = {}", record.key(), record.value()); if (intervalMsec > 0) { Thread.sleep(intervalMsec); } } } }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Integer, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key KafkaProducerUtil.createMessage(1000))) //Value .forEach(record -> { producer.send(record); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.close(); }
public KafkaConsumerEvent(String topic) { super(0l); this.topic = topic; Properties props = HeartBeatConfigContainer.getInstance().getKafkaConsumerConfig(); Properties producerProps = HeartBeatConfigContainer.getInstance().getKafkaProducerConfig(); try { dataConsumer = new KafkaConsumer<>(props); partition0 = new TopicPartition(this.topic, 0); dataConsumer.assign(Arrays.asList(partition0)); dataConsumer.seekToEnd(Arrays.asList(partition0)); KafkaConsumerContainer.getInstances().putConsumer(this.topic, dataConsumer); statProducer = new KafkaProducer<>(producerProps); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } startTime = System.currentTimeMillis(); }
private synchronized Producer<K,M> getProducer() { // Lazy init; also handles case where object has been serialized and Producer // needs to be recreated if (producer == null) { producer = new KafkaProducer<>(ConfigUtils.keyValueToProperties( "bootstrap.servers", updateBroker, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "value.serializer", "org.apache.kafka.common.serialization.StringSerializer", "linger.ms", 1000, // Make configurable? "batch.size", async ? 1 << 14 : 0, "compression.type", "gzip", "acks", 1, "max.request.size", 1 << 26 // TODO )); } return producer; }
public Producer() { LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName()); Properties kafkaProps = new Properties(); String defaultClusterValue = "localhost:9092"; String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue); LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0"); this.kafkaProducer = new KafkaProducer<>(kafkaProps); }
public static void main(final String[] args) { Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("acks", "all"); producerProps.put("retries", 1); producerProps.put("batch.size", 20000); producerProps.put("linger.ms", 1); producerProps.put("buffer.memory", 24568545); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps); for (int i = 0; i < 2000; i++) { ProducerRecord data = new ProducerRecord<String, String>("test1", "Hello this is record " + i); Future<RecordMetadata> recordMetadata = producer.send(data); } producer.close(); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.77.7:9094,192.168.77.7:9093,192.168.77.7:9092"); props.put("retries", 0); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("test", Long.toString(System.currentTimeMillis()), Integer.toString(i))); System.out.println("Sent message: " + i); } producer.close(); }
public static void sendStringMessage() throws Exception{ Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); //没有任何分区,默认1个分区,发送消息 int i=0; while(i<1000){ Thread.sleep(1000L); String message = "zhangsan"+i; producer.send(new ProducerRecord<>("NL_U_APP_ALARM_APP_STRING",message)); i++; producer.flush(); } producer.close(); }
public static void main(String[] args) throws Exception { // set producer properties Properties prop = PropertyFileReader.readPropertyFile(); Properties properties = new Properties(); properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers")); properties.put("acks", prop.getProperty("kafka.acks")); properties.put("retries",prop.getProperty("kafka.retries")); properties.put("batch.size", prop.getProperty("kafka.batch.size")); properties.put("linger.ms", prop.getProperty("kafka.linger.ms")); properties.put("max.request.size", prop.getProperty("kafka.max.request.size")); properties.put("compression.type", prop.getProperty("kafka.compression.type")); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // generate event Producer<String, String> producer = new KafkaProducer<String, String>(properties); generateIoTEvent(producer,prop.getProperty("kafka.topic"),prop.getProperty("camera.id"),prop.getProperty("camera.url")); }
@PostConstruct public void init() { properties.put("bootstrap.servers", "127.0.0.1:9092"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", "-1"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 0); properties.put("buffer.memory", 33554432); try { this.producer = new KafkaProducer<>(properties); } catch (Exception e) { log.error("Failed to start kafka producer", e); throw new RuntimeException(e); } log.info("Kafka Producer is started...."); }
@Override public void run() { PropertyReader propertyReader = new PropertyReader(); Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", propertyReader.getPropertyValue("broker.list")); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("auto.create.topics.enable", "true"); KafkaProducer<String, String> ipProducer = new KafkaProducer<String, String>(producerProps); BufferedReader br = readFile(); String oldLine = ""; try { while ((oldLine = br.readLine()) != null) { String line = getNewRecordWithRandomIP(oldLine).replace("[", "").replace("]", ""); ProducerRecord ipData = new ProducerRecord<String, String>(propertyReader.getPropertyValue("topic"), line); Future<RecordMetadata> recordMetadata = ipProducer.send(ipData); } } catch (IOException e) { e.printStackTrace(); } ipProducer.close(); }
/** * @param args */ public static void main(String[] args) { Properties props=new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props); // ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, value); // sampleProducer.send(record); for (int i = 0; i < 10; i++) sampleProducer.send(new ProducerRecord<String, String>("demo-topic1","Data:"+ Integer.toString(i))); sampleProducer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
protected void init(AbstractConfiguration config) { BROKER_TOPIC_PREFIX = config.getString("communicator.broker.topic"); APPLICATION_TOPIC = config.getString("communicator.application.topic"); logger.trace("Initializing Kafka producer ..."); // producer config Properties props = new Properties(); props.put("bootstrap.servers", config.getString("bootstrap.servers")); props.put("acks", config.getString("acks")); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", InternalMessageSerializer.class.getName()); // producer this.producer = new KafkaProducer<>(props); // consumer executor this.executor = Executors.newSingleThreadExecutor(); }
public void publishDummyDataNumbers() { final String topic = "NumbersTopic"; // Create publisher final Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(config); for (int value = 0; value < 10000; value++) { producer.send(new ProducerRecord<>(topic, value, value)); } producer.flush(); producer.close(); }
private void initProducer() { Properties props = new Properties(); props.put("bootstrap.servers", HOST + ":" + serverPort); props.put("acks", "1"); producer = new KafkaProducer<String,byte[]>(props, new StringSerializer(), new ByteArraySerializer()); }
private Producer<K, V> createProducer() { // Always require producer acks to all to ensure durable writes producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); // Don't allow more than one in-flight request to prevent reordering on retry (if enabled) producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); return new KafkaProducer<>(producerConfigs); }
public static void main(String args[]) { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", "1"); KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties); int counter =0; int nbrOfEventsRequired = Integer.parseInt(args[0]); while (counter<nbrOfEventsRequired) { StringBuffer stream = new StringBuffer(); long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l, 9999999999l); int bin = ThreadLocalRandom.current().nextInt(100000, 9999999); int bout = ThreadLocalRandom.current().nextInt(100000, 9999999); stream.append(phoneNumber); stream.append(","); stream.append(bin); stream.append(","); stream.append(bout); stream.append(","); stream.append(System.currentTimeMillis()); System.out.println(stream.toString()); ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>( "device-data", stream.toString()); producer.send(data); counter++; } producer.close(); }
public static void main(String args[]) { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", "1"); KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties); int counter =0; int nbrOfEventsRequired = Integer.parseInt(args[0]); while (counter<nbrOfEventsRequired) { StringBuffer stream = new StringBuffer(); long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l, 9999999960l); int bin = ThreadLocalRandom.current().nextInt(1000, 9999); int bout = ThreadLocalRandom.current().nextInt(1000, 9999); stream.append(phoneNumber); stream.append(","); stream.append(bin); stream.append(","); stream.append(bout); stream.append(","); stream.append(new Date(ThreadLocalRandom.current().nextLong())); System.out.println(stream.toString()); ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>( "storm-trident-diy", stream.toString()); producer.send(data); counter++; } producer.close(); }
/** * Get all partitions for a given topic. * * @param topicName The topic to get partitions for. * @return {@link List} of {@link TopicPartition} values corresponding to the topic. */ List<TopicPartition> getAllPartitions(KafkaProducer<String, byte[]> dummy, String topicName) { List<TopicPartition> partitions = dummy.partitionsFor(topicName) .stream().map(i -> new TopicPartition(i.topic(), i.partition())) .collect(Collectors.toList()); dummy.close(); return partitions; }
private static KafkaProducer<Integer, String> configureKafka() { Properties properties = new Properties(); properties.put("bootstrap.servers", BROKER_1_CONNECTION_STRING); properties.put("key.serializer", StringSerializer.class.getName()); properties.put("value.serializer", StringSerializer.class.getName()); properties.put("auto.offset.reset", "smallest"); properties.put("acks", "1"); KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties); return producer; }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ACKS_CONFIG, "all"); props.put(RETRIES_CONFIG, 0); props.put(BATCH_SIZE_CONFIG, 25000); // nicht warten props.put(LINGER_MS_CONFIG, 200); props.put(BUFFER_MEMORY_CONFIG, 33554432); props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); StringBuffer muellBuffer = new StringBuffer(); for(int j = 0; j < 100000; j++) { muellBuffer.append(j); } String muell = muellBuffer.toString(); Producer<String, String> producer = new KafkaProducer<>(props); System.out.println("Start sending!"); for(int i = 1; i <= 10000; i++) { int key = i % 10; producer.send(new ProducerRecord<>("produktion", Integer.toString(i % 12), muell)); if(i % 500 == 0) { Thread.sleep(1000); System.out.println("i = " + i); } } System.out.println("done!"); producer.close(); }
public KafkaProducerClient(final List<String> clients, final long intervalInMillis) { this.clients = clients; this.intervalInMillis = intervalInMillis; final Map<String, Object> props = new HashMap<>(); props.put("bootstrap.servers", kafkaHosts); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); producer = new KafkaProducer<>(props); }
public static void main(String args[]) { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", "1"); KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties); int counter =0; int nbrOfEventsRequired = Integer.parseInt(args[0]); while (counter<nbrOfEventsRequired) { StringBuffer stream = new StringBuffer(); long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l, 9999999999l); int bin = ThreadLocalRandom.current().nextInt(1000, 9999); int bout = ThreadLocalRandom.current().nextInt(1000, 9999); stream.append(phoneNumber); stream.append(","); stream.append(bin); stream.append(","); stream.append(bout); stream.append(","); stream.append(new Date(ThreadLocalRandom.current().nextLong())); System.out.println(stream.toString()); ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>( "storm-diy", stream.toString()); producer.send(data); counter++; } producer.close(); }
private synchronized Producer<K,M> getProducer() { // Lazy init if (producer == null) { producer = new KafkaProducer<>(ConfigUtils.keyValueToProperties( "bootstrap.servers", updateBroker, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "value.serializer", "org.apache.kafka.common.serialization.StringSerializer", "linger.ms", 1000, // Make configurable? "compression.type", "gzip", "acks", 1, "max.request.size", 1 << 26 // TODO )); } return producer; }
public KafkaOperationRepository createKafkaOperationRepository(ObjectMapper objectMapper) { KafkaProducer<String, Operation> operationsKafka = new KafkaProducer<>( kafkaProperties.buildProducerProperties(), new StringSerializer(), new JsonSerializer<>(objectMapper)); KafkaProducer<String, PublishedEventWrapper> eventsKafka = new KafkaProducer<>(kafkaProperties.buildProducerProperties(), new StringSerializer(), new JsonSerializer<>(objectMapper)); return new KafkaOperationRepository(operationsKafka, eventsKafka, kafkaProperties.getConsumer().getGroupId()); }