Java 类org.apache.kafka.clients.producer.Producer 实例源码

项目:oryx2    文件:ProduceData.java   
public void start() throws InterruptedException {
  RandomGenerator random = RandomManager.getRandom();

  Properties props = ConfigUtils.keyValueToProperties(
      "bootstrap.servers", "localhost:" + kafkaPort,
      "key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "compression.type", "gzip",
      "batch.size", 0,
      "acks", 1,
      "max.request.size", 1 << 26 // TODO
  );
  try (Producer<String,String> producer = new KafkaProducer<>(props)) {
    for (int i = 0; i < howMany; i++) {
      Pair<String,String> datum = datumGenerator.generate(i, random);
      ProducerRecord<String,String> record =
          new ProducerRecord<>(topic, datum.getFirst(), datum.getSecond());
      producer.send(record);
      log.debug("Sent datum {} = {}", record.key(), record.value());
      if (intervalMsec > 0) {
        Thread.sleep(intervalMsec);
      }
    }
  }
}
项目:java-kafka-client    文件:TracingKafkaTest.java   
@Test
public void nullKey() throws Exception {
  Producer<Integer, String> producer = createProducer();

  ProducerRecord<Integer, String> record = new ProducerRecord<>("messages", "test");
  producer.send(record);

  final Map<String, Object> consumerProps = KafkaTestUtils
      .consumerProps("sampleRawConsumer", "false", embeddedKafka);
  consumerProps.put("auto.offset.reset", "earliest");

  final CountDownLatch latch = new CountDownLatch(1);
  createConsumer(latch, null);

  producer.close();
}
项目:cruise-control    文件:CruiseControlMetricsReporterTest.java   
@Before
public void setUp() {
  super.setUp();
  Properties props = new Properties();
  props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
  AtomicInteger failed = new AtomicInteger(0);
  try (Producer<String, String> producer = createProducer(props)) {
    for (int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<>("TestTopic", Integer.toString(i)), new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          if (e != null) {
            failed.incrementAndGet();
          }
        }
      });
    }
  }
  assertEquals(0, failed.get());
}
项目:trellis-rosid    文件:AbstractResourceService.java   
/**
 * Create an AbstractResourceService with the given producer
 * @param baseUrl the base URL
 * @param producer the kafka producer
 * @param curator the zookeeper curator
 * @param notifications the event service
 * @param idSupplier a supplier of new identifiers
 * @param async write cached resources asynchronously if true, synchronously if false
 */
public AbstractResourceService(final String baseUrl, final Producer<String, String> producer,
        final CuratorFramework curator, final EventService notifications, final Supplier<String> idSupplier,
        final Boolean async) {

    this.baseUrl = baseUrl;
    this.notifications = notifications;
    this.async = async;
    this.idSupplier = idSupplier;
    this.producer = producer;
    this.curator = curator;

    try {
        this.curator.createContainers(ZNODE_COORDINATION);
    } catch (final Exception ex) {
        LOGGER.error("Could not create zk session node: {}", ex.getMessage());
        throw new RuntimeTrellisException(ex);
    }
}
项目:DBus    文件:FullPullHelper.java   
private static void sendAckInfoToCtrlTopic(String dataSourceInfo, String completedTime, String pullStatus) {
    try {
        // 在源dataSourceInfo的基础上,更新全量拉取相关信息。然后发回src topic
        JSONObject jsonObj = JSONObject.parseObject(dataSourceInfo);
        jsonObj.put(DataPullConstants.FullPullInterfaceJson.FROM_KEY, DataPullConstants.FullPullInterfaceJson.FROM_VALUE);
        jsonObj.put(DataPullConstants.FullPullInterfaceJson.TYPE_KEY, DataPullConstants.FullPullInterfaceJson.TYPE_VALUE);
        // notifyFullPullRequestor
        JSONObject payloadObj = jsonObj.getJSONObject(DataPullConstants.FullPullInterfaceJson.PAYLOAD_KEY);
        // 完成时间
        payloadObj.put(DataPullConstants.FullPullInterfaceJson.COMPLETE_TIME_KEY, completedTime);
        // 拉取是否成功标志位
        payloadObj.put(DataPullConstants.FullPullInterfaceJson.DATA_STATUS_KEY, pullStatus);
        jsonObj.put(DataPullConstants.FullPullInterfaceJson.PAYLOAD_KEY, payloadObj);
        String ctrlTopic = getFullPullProperties(Constants.ZkTopoConfForFullPull.COMMON_CONFIG, true)
            .getProperty(Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC);
        Producer producer = DbusHelper
                .getProducer(getFullPullProperties(Constants.ZkTopoConfForFullPull.BYTE_PRODUCER_CONFIG, true));
        ProducerRecord record = new ProducerRecord<>(ctrlTopic, DataPullConstants.FullPullInterfaceJson.TYPE_VALUE, jsonObj.toString().getBytes());
        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata meta = future.get();
    }
    catch (Exception e) {
        Log.error("Error occurred when report full data pulling status.", e);
        throw new RuntimeException(e);
    }
}
项目:DBus    文件:DataShardsSplittingBolt.java   
private void loadRunningConf(String reloadMsgJson) {
    String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded";
    String loadResultMsg = null;
    try {
        this.confMap = FullPullHelper.loadConfProps(zkconnect, topologyId, zkTopoRoot, null);
        this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON);
        this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME);
        this.byteProducer = (Producer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_BYTE_PRODUCER);
        this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE);
        loadResultMsg = "Running Config is " + notifyEvtName + " successfully for DataShardsSplittingBolt!";
        LOG.info(loadResultMsg);
    }  catch (Exception e) {
        loadResultMsg = e.getMessage();
        LOG.error(notifyEvtName + "ing running configuration encountered Exception!", loadResultMsg);
    } finally {
        if (reloadMsgJson != null) {
            FullPullHelper.saveReloadStatus(reloadMsgJson, "splitting-bolt", false, zkconnect);
        }
    }
}
项目:kafka-docker-demo    文件:ProducerDemo.java   
public static void main(String[] args) throws Exception {

        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.77.7:9094,192.168.77.7:9093,192.168.77.7:9092");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for(int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("test", Long.toString(System.currentTimeMillis()), Integer.toString(i)));
            System.out.println("Sent message: " + i);
        }

        producer.close();
    }
项目:beam-portability-demo    文件:Injector.java   
/**
 * Publish 'numMessages' arbitrary events from live users with the provided delay, to a
 * Kafka topic.
 */
public static void publishDataToKafka(int numMessages, int delayInMillis)
    throws IOException {

  Producer<String, String> producer = new KafkaProducer<>(kafkaProps);

  for (int i = 0; i < Math.max(1, numMessages); i++) {
    Long currTime = System.currentTimeMillis();
    String message = generateEvent(currTime, delayInMillis);
    producer.send(new ProducerRecord<String, String>("game", null, message)); //TODO(fjp): Generalize
    // TODO(fjp): How do we get late data working?
    // if (delayInMillis != 0) {
    //   System.out.println(pubsubMessage.getAttributes());
    //   System.out.println("late data for: " + message);
    // }
    // pubsubMessages.add(pubsubMessage);
  }

  producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThreadTest.java   
TestStreamTask(final TaskId id,
               final String applicationId,
               final Collection<TopicPartition> partitions,
               final ProcessorTopology topology,
               final Consumer<byte[], byte[]> consumer,
               final Producer<byte[], byte[]> producer,
               final Consumer<byte[], byte[]> restoreConsumer,
               final StreamsConfig config,
               final StreamsMetrics metrics,
               final StateDirectory stateDirectory) {
    super(id,
        applicationId,
        partitions,
        topology,
        consumer,
        new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000),
        config,
        metrics,
        stateDirectory,
        null,
        new MockTime(),
        producer);
}
项目:trellis-rosid-file    文件:FileResourceService.java   
/**
 * Create a File-based repository service
 * @param partitionData the partition data configuration
 * @param partitionUrls the partition URL configuration
 * @param curator the curator framework
 * @param producer the kafka producer
 * @param notifications the notification service
 * @param idSupplier an identifier supplier for new resources
 * @param async generate cached resources asynchronously if true, synchonously if false
 * @throws IOException if the directory is not writable
 */
public FileResourceService(final Map<String, String> partitionData, final Map<String, String> partitionUrls,
        final CuratorFramework curator, final Producer<String, String> producer, final EventService notifications,
        final Supplier<String> idSupplier, final Boolean async) throws IOException {
    super(partitionUrls, producer, curator, notifications, idSupplier, async);

    requireNonNull(partitionData, "partition data configuration may not be null!");

    RESERVED_PARTITION_NAMES.stream().filter(partitionData::containsKey).findAny().ifPresent(name -> {
        throw new IllegalArgumentException("Invalid partition name: " + name);
    });

    this.partitionData = partitionData;

    init();
}
项目:spark2.0    文件:KafkaSendMessage.java   
public static void  sendStringMessage() throws Exception{
    Properties props = new Properties();
    props.put("bootstrap.servers", servers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

    //没有任何分区,默认1个分区,发送消息
    int i=0;
    while(i<1000){
        Thread.sleep(1000L);
        String message = "zhangsan"+i;
        producer.send(new ProducerRecord<>("NL_U_APP_ALARM_APP_STRING",message));
        i++;
        producer.flush();
    }
    producer.close();
}
项目:apache-kafka-demos    文件:SimpleProducer.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ACKS_CONFIG, "all");
        props.put(RETRIES_CONFIG, 0);
        props.put(BATCH_SIZE_CONFIG, 16384);
        props.put(LINGER_MS_CONFIG, 0);
        props.put(BUFFER_MEMORY_CONFIG, 33554432);
        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


        org.apache.kafka.clients.producer.Producer<Long, String> producer = new KafkaProducer<>(props);

        System.out.println("Start sending!");
        for(int i = 1; i <= 12; i++) {
            producer.send(new ProducerRecord<>("produktion", round(random() * 6) + 1, "Message: " + i));
        }
        System.out.println("done!");

        producer.close();
    }
项目:logback-kafka-appender    文件:TransporterFactory.java   
/**
 * Prepare Transporter for production based on Delivery type {@link DeliveryType}
 * <p/>
 * Provide {@link Producer} to transporter that will be used to communicate and
 * send data to kafka broker.
 *
 * @param deliveryType
 * @param producer
 * @return
 */
public static Transporter getTransporter(DeliveryType deliveryType, Producer<byte[], byte[]> producer) {
    Transporter transporter = null;
    switch (deliveryType) {
        case NORMAL:
            transporter = new NormalTransporter(producer);
            break;

        case YIElD:
            transporter = new YieldTransporter(producer);
            break;

        default:
            transporter = new NormalTransporter(producer);
            break;
    }
    return transporter;
}
项目:Lagerta    文件:QuasiKafkaStoreConfiguration.java   
/** {@inheritDoc} */
@Override public void afterPropertiesSet() throws Exception {
    bindings.put(Exporter.class, FileExporter.class);
    bindings.put(Serializer.class, JavaSerializer.class);
    bindings.put(KeyValueManager.class, KeyValueManagerImpl.class);
    bindings.put(MetadataProvider.class, MetadataProviderImpl.class);

    bindings.put(MetadataManager.class, InMemoryMetadataManager.class);
    bindings.put(KeyValueProvider.class, QuasiKafkaKeyValueProvider.class);
    bindings.put(KeyValueReader.class, SnapshotAwareKeyValueReaderListener.class);
    bindings.put(IdSequencer.class, InMemoryIdSequencer.class);

    if (producer != null) {
        factories.put(Producer.class, factoryOf((Serializable)producer));
    } else {
        factories.put(Producer.class, producerFactory);
    }

    List classes = Collections.singletonList(SnapshotAwareKeyValueReaderListener.class);
    factories.put(List.class, new Injection.ListOf<>(classes));
}
项目:video-stream-analytics    文件:VideoStreamCollector.java   
public static void main(String[] args) throws Exception {

    // set producer properties
    Properties prop = PropertyFileReader.readPropertyFile();    
    Properties properties = new Properties();
    properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));
    properties.put("acks", prop.getProperty("kafka.acks"));
    properties.put("retries",prop.getProperty("kafka.retries"));
    properties.put("batch.size", prop.getProperty("kafka.batch.size"));
    properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));
    properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));
    properties.put("compression.type", prop.getProperty("kafka.compression.type"));
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // generate event
    Producer<String, String> producer = new KafkaProducer<String, String>(properties);
    generateIoTEvent(producer,prop.getProperty("kafka.topic"),prop.getProperty("camera.id"),prop.getProperty("camera.url"));
}
项目:spark-cassandra-poc    文件:KafkaDataProducer.java   
public void sendData(String data) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        Map<MetricName, ? extends Metric> metrics = producer.metrics();
        System.out.println(metrics);

        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("video_view", data));

        producer.close();

    }
项目:talk-kafka-messaging-logs    文件:ProduceConsumeLongByteArrayRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Long, byte[]> producer = new KafkaProducer<>(properties);

    LongStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC, //topic
                            number, //key
                            String.format("record-%s", number.toString()).getBytes())) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeStringAvroRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<String, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number -> new ProducerRecord<>(
                    TOPIC, //topic
                    number.toString(), //key
                    UserAvroSerdes.serialize(new User(String.format("user-%s", number.toString()))))) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeIntegerStringRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Compaction.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Retention.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream
            .rangeClosed(1, 100000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Compaction.java   
private static void produceRecords() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Integer, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            KafkaProducerUtil.createMessage(1000))) //Value
            .forEach(record -> {
                producer.send(record);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:KafkaSlowProducer.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                producer.send(record);
            });
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThreadTest.java   
@Test
public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
    builder.addSource("source1", "someTopic");

    final StreamThread thread = new StreamThread(
        builder,
        config,
        clientSupplier,
        applicationId,
        clientId,
        processId,
        metrics,
        mockTime,
        new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
        0);

    final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
    assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
    assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
    thread.setPartitionAssignor(new MockStreamsPartitionAssignor(assignment));

    thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));

    assertEquals(1, clientSupplier.producers.size());
    final Producer globalProducer = clientSupplier.producers.get(0);
    assertSame(globalProducer, thread.threadProducer);
    for (final StreamTask task : thread.tasks().values()) {
        assertSame(globalProducer, ((RecordCollectorImpl) task.recordCollector()).producer());
    }
    assertSame(clientSupplier.consumer, thread.consumer);
    assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
}
项目:java-kafka-client    文件:TracingKafkaStreamsTest.java   
@Test
public void test() throws Exception {
  Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

  Properties config = new Properties();
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, senderProps.get("bootstrap.servers"));
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

  Producer<Integer, String> producer = createProducer();
  ProducerRecord<Integer, String> record = new ProducerRecord<>("stream-test", 1, "test");
  producer.send(record);

  final Serde<String> stringSerde = Serdes.String();
  final Serde<Integer> intSerde = Serdes.Integer();

  KStreamBuilder builder = new KStreamBuilder();
  KStream<Integer, String> kStream = builder
      .stream(intSerde, stringSerde, "stream-test");

  kStream.map((key, value) -> new KeyValue<>(key, value + "map")).to("stream-out");

  KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(config),
      new TracingKafkaClientSupplier(mockTracer));
  streams.start();

  await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), equalTo(3));

  streams.close();
  producer.close();

  List<MockSpan> spans = mockTracer.finishedSpans();
  assertEquals(3, spans.size());
  checkSpans(spans);

  assertNull(mockTracer.activeSpan());
}
项目:kafka-0.11.0.0-src-with-comment    文件:MockClientSupplier.java   
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
    if (applicationId != null) {
        assertThat((String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), startsWith(applicationId + "-"));
    } else {
        assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
    }
    final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
    producers.add(producer);
    return producer;
}
项目:nighthawk    文件:KafkaTracingProducerFactoryBean.java   
@Override
public Producer getObject() throws Exception {
    if (producer == null) {
        afterPropertiesSet();
    }
    return producer;
}
项目:spark2.0    文件:KafkaSendMessage.java   
public static void  sendWrapperMessage() throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers", servers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "com.gochinatv.spark.kafka.SerializedMessage");
    Producer<String, WrapperAppMessage> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

    //case 1:
    //没有任何分区,默认1个分区,发送消息
    int i=0;
    while(i<1000){
        Thread.sleep(1000L);
        WrapperAppMessage message = new WrapperAppMessage();
        message.setAgreeId((i+1)%5);
        message.setCityId((i+1)%3);
        message.setConnectType((i+1)%4);
        message.setCount((i+100)%10);
        message.setInstanceId((i+1)%6);
        message.setProvinceId((i+1)%4);
        message.setTimestamp(System.currentTimeMillis());
        message.setValue((float)((i+200)%4));
        producer.send(new ProducerRecord<>("NL_U_APP_ALARM_APP",message));
        System.out.println(message.toString());
        i++;
        producer.flush();
    }
    producer.close();
}
项目:Re-Collector    文件:KafkaOutput.java   
@Override
public void write(Message message) {
    Uninterruptibles.awaitUninterruptibly(transportInitialized);
    LOG.debug("Sending message: {}", message);
    try {
        Producer<String, String> producer = getProducer(configuration);
        producer.send(new ProducerRecord<>(String.valueOf(configuration.getTopic()),
                "message", message.getMessage()));
    } catch (Exception e) {
        LOG.error("Failed to send message", e);
    }
}
项目:DBus    文件:KafkaContainer.java   
public Producer getProducer(Properties props){
    if(producerMap.containsKey(props)){
           return producerMap.get(props);
       }else{
           Producer producer = new KafkaProducer<>(props);
           producerMap.put(props, producer);
           return producer;
       }
}
项目:video-stream-analytics    文件:VideoStreamCollector.java   
private static void generateIoTEvent(Producer<String, String> producer, String topic, String camId, String videoUrl) throws Exception {
    String[] urls = videoUrl.split(",");
    String[] ids = camId.split(",");
    if(urls.length != ids.length){
        throw new Exception("There should be same number of camera Id and url");
    }
    logger.info("Total urls to process "+urls.length);
    for(int i=0;i<urls.length;i++){
        Thread t = new Thread(new VideoEventGenerator(ids[i].trim(),urls[i].trim(),producer,topic));
        t.start();
    }
}
项目:DBus    文件:WrapperBolt.java   
private Producer<String, String> createProducer() throws Exception {
    Properties props = PropertiesHolder.getProperties(Constants.Properties.PRODUCER_CONFIG);
    props.setProperty("client.id", this.topologyId + "_wrapper_" + context.getThisTaskId());

    Producer<String, String> producer = new KafkaProducer<>(props);
    return producer;
}
项目:DBus    文件:DbusKafkaWriterBolt.java   
private Producer<String, String> createProducer() throws Exception {
    Properties props = PropertiesHolder.getProperties(Constants.Properties.PRODUCER_CONFIG);
    props.setProperty("client.id", this.topologyId + "_writer_" + context.getThisTaskId());

    Producer<String, String> producer = new KafkaProducer<>(props);
    return producer;
}
项目:DBus    文件:DbusKafkaSpout.java   
private Producer<String, byte[]> createProducer() throws Exception {
    Properties props = PropertiesHolder.getProperties(Constants.Properties.PRODUCER_CONTROL);
    props.setProperty("client.id", this.topologyId + "_control_" + context.getThisTaskId());

    Producer<String, byte[]> producer = new KafkaProducer<>(props);
    return producer;
}
项目:DBus    文件:MetaEventWarningSender.java   
private static Producer<String, String> createProducer() throws Exception {
    Properties props = PropertiesHolder.getProperties(Constants.Properties.PRODUCER_CONFIG);
    props.setProperty("client.id", "meta-event-warning");

    Producer<String, String> producer = new KafkaProducer<>(props);
    return producer;
}
项目:DBus    文件:PagedBatchDataFetchingBolt.java   
private void loadRunningConf(String reloadMsgJson) {
    String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded";
    String loadResultMsg = null;
    try {
        this.confMap = FullPullHelper.loadConfProps(zkconnect, topologyId, zkTopoRoot, null);
        this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON);
        this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME);
        this.stringProducer = (Producer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_STRING_PRODUCER);
        this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE);
        this.stringProducerProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_STRING_PRODUCER_PROPS);

        String sendBatchSizeStr = stringProducerProps.getProperty(DataPullConstants.KAFKA_SEND_BATCH_SIZE);
        String sendRowsStr = stringProducerProps.getProperty(DataPullConstants.KAFKA_SEND_ROWS);
        if (StringUtils.isNotBlank(sendBatchSizeStr) && (Long.valueOf(sendBatchSizeStr) != kafkaSendBatchSize.get())) {
            kafkaSendBatchSize.set(Long.valueOf(sendBatchSizeStr));
        }
        if (StringUtils.isNotBlank(sendRowsStr) && (Long.valueOf(sendRowsStr) != kafkaSendRows.get())) {
            kafkaSendRows.set(Long.valueOf(sendRowsStr));
        }
        loadResultMsg = "Running Config is " + notifyEvtName + " successfully for PagedBatchDataFetchingBolt!";
        LOG.info(loadResultMsg);
    } catch (Exception e) {
        loadResultMsg = e.getMessage();
        LOG.error(notifyEvtName + "ing running configuration encountered Exception!", loadResultMsg);
        throw e;
    } finally {
        if (reloadMsgJson != null) {
            FullPullHelper.saveReloadStatus(reloadMsgJson, "pulling-dataFetching-bolt", false, zkconnect);
        }
    }
}
项目:docker-kafka-demo    文件:KafkaProducer.java   
public static void main(String[] args) throws IOException, ParseException {
    //Kafka Part
    Properties properties = new Properties();

    //set the kafka boostrap Server
    properties.setProperty("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL);
    //tell the client if the key and value is a string or something else
    properties.setProperty("key.serializer", StringSerializer.class.getName());
    properties.setProperty("value.serializer", StringSerializer.class.getName());
    //set the acknowledge of the producer to -1, 0, 1
    properties.setProperty("acks", "1");
    //if there is no connection how often the client should retry it until it stops
    properties.setProperty("retries", "3");
    //it will send ever ms a message otherwise use producer.flush() below where marked
    properties.setProperty("linger.ms", "1");
    //use a truststore and https
    properties.setProperty("security.protocol",KafkaProperties.SECURITY_PROTOCOL);
    properties.setProperty("ssl.truststore.location", KafkaProperties.TRUSTSTORE_LOCATION);
    properties.setProperty("ssl.truststore.password",KafkaProperties.TRUSTSTORE_PASSWORD);
    properties.setProperty("ssl.endpoint.identification.algorithm",KafkaProperties.ENDPOINT_ALGORITHM);



    Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);

    //Simple Message Producer instead of the for loop => ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("foobar", "2", "Huh!");
    for (int key=0; key < 10; key++){
        //change here the topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(KafkaProperties.TOPIC, Integer.toString(key), "My new keys are here: "+ Integer.toString(key));
        producer.send(producerRecord);
    }

    //here you could use also producer.flush() to send the message
    producer.close();
}
项目:kafka-streams-machine-learning-examples    文件:IntegrationTestUtils.java   
/**
 * @param topic          Kafka topic to write the data records to
 * @param records        Data records to write to Kafka
 * @param producerConfig Kafka producer configuration
 * @param <K>            Key type of the data records
 * @param <V>            Value type of the data records
 */
public static <K, V> void produceKeyValuesSynchronously(
    String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
    throws ExecutionException, InterruptedException {
  Producer<K, V> producer = new KafkaProducer<>(producerConfig);
  for (KeyValue<K, V> record : records) {
    Future<RecordMetadata> f = producer.send(
        new ProducerRecord<>(topic, record.key, record.value));
    f.get();
  }
  producer.flush();
  producer.close();
}
项目:SkyEye    文件:LazySingletonProducer.java   
/**
 * 实例化
 * @param config
 * @return
 */
public static Producer<byte[], String> getInstance(Map<String, Object> config) {
    if (producer == null) {
        synchronized(LazySingletonProducer.class) {
            if (producer == null) {
                producer = new KafkaProducer<byte[], String>(config);
            }
        }
    }
    return producer;
}
项目:library    文件:MediatorServiceImpl.java   
@Override
public void addBook(AddBookCommand command) {
    Producer<String, String> producer = new KafkaProducer<>(properties);

    //TODO: move topic to configuration file
    //TODO: decide what to use for key
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-topic", "key", command.toString());
    producer.send(producerRecord);
    log.debug("Record sent={}", producerRecord);
    producer.close();
}