Java 类org.apache.kafka.clients.producer.RecordMetadata 实例源码

项目:java-kafka-client    文件:TracingKafkaProducer.java   
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  /*
  // Create wrappedRecord because headers can be read only in record (if record is sent second time)
  ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(),
      record.partition(),
      record.timestamp(),
      record.key(),
      record.value(),
      record.headers());
  */

  try (Scope scope = buildAndInjectSpan(record)) {
    Callback wrappedCallback = new TracingCallback(callback, scope);
    return producer.send(record, wrappedCallback);
  }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProducerBatchTest.java   
@Test
public void testBatchCannotCompleteTwice() throws Exception {
    ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
    FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
    batch.done(500L, 10L, null);

    try {
        batch.done(1000L, 20L, null);
        fail("Expected exception from done");
    } catch (IllegalStateException e) {
        // expected
    }

    RecordMetadata recordMetadata = future.get();
    assertEquals(500L, recordMetadata.offset());
    assertEquals(10L, recordMetadata.timestamp());
}
项目:doctorkafka    文件:KafkaAvroPublisher.java   
public void publish(BrokerStats brokerStats) throws IOException {
  try {
    ByteArrayOutputStream stream = new ByteArrayOutputStream();
    BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null);

    avroEventWriter.write(brokerStats, binaryEncoder);
    binaryEncoder.flush();
    IOUtils.closeQuietly(stream);

    String key = brokerStats.getName() + "_" + System.currentTimeMillis();
    int numPartitions = kafkaProducer.partitionsFor(destTopic).size();
    int partition = brokerStats.getId() % numPartitions;

    Future<RecordMetadata> future = kafkaProducer.send(
        new ProducerRecord(destTopic, partition, key.getBytes(), stream.toByteArray()));
    future.get();

    OpenTsdbMetricConverter.incr("kafka.stats.collector.success", 1, "host=" + HOSTNAME);
  } catch (Exception e) {
    LOG.error("Failure in publish stats", e);
    OpenTsdbMetricConverter.incr("kafka.stats.collector.failure", 1, "host=" + HOSTNAME);
    throw new RuntimeException("Avro serialization failure", e);
  }
}
项目:wechat-mall    文件:OrderProducer.java   
@Override
public void send(Long k, byte[] v) {
    KafkaProducer<Long, byte[]> p = getWorker();
    p.initTransactions();
    p.beginTransaction();
    Future<RecordMetadata> res = worker.send(new ProducerRecord<Long, byte[]>(topic, k, v));
    RecordMetadata record;
    try {
        record = res.get();
        offsets.clear();
        offsets.put(new TopicPartition(topic, record.partition()), new OffsetAndMetadata(record.offset()));
        p.sendOffsetsToTransaction(offsets, MallConstants.ORDER_GROUP);
        p.commitTransaction();
    } catch (InterruptedException | ExecutionException e) {
        p.abortTransaction();
    }
}
项目:trellis-rosid    文件:EventProducer.java   
/**
 * Emit messages to the relevant kafka topics
 * @return true if the messages were successfully delivered to the kafka topics; false otherwise
 */
public Boolean emit() {

    try {
        final List<Future<RecordMetadata>> results = new ArrayList<>();

        if (async) {
            results.add(producer.send(new ProducerRecord<>(TOPIC_CACHE, identifier.getIRIString(),
                            serialize(dataset))));
        }

        // Update the containment triples of the parent resource if this is a delete or create operation
        parent.ifPresent(emitToParent(identifier, dataset, results));

        for (final Future<RecordMetadata> result : results) {
            final RecordMetadata res = result.get();
            LOGGER.debug("Send record to topic: {}, {}", res, res.timestamp());
        }

        return true;
    } catch (final InterruptedException | ExecutionException ex) {
        LOGGER.error("Error sending record to kafka topic: {}", ex.getMessage());
        return false;
    }
}
项目:DBus    文件:MetaEventWarningSender.java   
public void sendMessage(MetaVersion ver, MetaWrapper newMeta, MetaCompareResult result) {
    ControlMessage message = new ControlMessage(System.currentTimeMillis(), ControlType.G_META_SYNC_WARNING.toString(), "dbus-appender");

    message.addPayload("datasource", GlobalCache.getDatasource().getDsName());
    message.addPayload("schema", ver.getSchema());
    message.addPayload("tableId", ver.getTableId());
    message.addPayload("table", ver.getTable());
    message.addPayload("before", ver.getMeta());
    message.addPayload("after", newMeta);
    message.addPayload("compare-result", JSON.toJSON(result));
    message.addPayload("version", ver.getVersion());

    String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC);
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message.getType(), message.toJSONString());
    Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            logger.error("Send global event error.{}", exception.getMessage());
        }
    });
    try {
        future.get(10000, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}
项目:DBus    文件:FullPullHelper.java   
private static void sendAckInfoToCtrlTopic(String dataSourceInfo, String completedTime, String pullStatus) {
    try {
        // 在源dataSourceInfo的基础上,更新全量拉取相关信息。然后发回src topic
        JSONObject jsonObj = JSONObject.parseObject(dataSourceInfo);
        jsonObj.put(DataPullConstants.FullPullInterfaceJson.FROM_KEY, DataPullConstants.FullPullInterfaceJson.FROM_VALUE);
        jsonObj.put(DataPullConstants.FullPullInterfaceJson.TYPE_KEY, DataPullConstants.FullPullInterfaceJson.TYPE_VALUE);
        // notifyFullPullRequestor
        JSONObject payloadObj = jsonObj.getJSONObject(DataPullConstants.FullPullInterfaceJson.PAYLOAD_KEY);
        // 完成时间
        payloadObj.put(DataPullConstants.FullPullInterfaceJson.COMPLETE_TIME_KEY, completedTime);
        // 拉取是否成功标志位
        payloadObj.put(DataPullConstants.FullPullInterfaceJson.DATA_STATUS_KEY, pullStatus);
        jsonObj.put(DataPullConstants.FullPullInterfaceJson.PAYLOAD_KEY, payloadObj);
        String ctrlTopic = getFullPullProperties(Constants.ZkTopoConfForFullPull.COMMON_CONFIG, true)
            .getProperty(Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC);
        Producer producer = DbusHelper
                .getProducer(getFullPullProperties(Constants.ZkTopoConfForFullPull.BYTE_PRODUCER_CONFIG, true));
        ProducerRecord record = new ProducerRecord<>(ctrlTopic, DataPullConstants.FullPullInterfaceJson.TYPE_VALUE, jsonObj.toString().getBytes());
        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata meta = future.get();
    }
    catch (Exception e) {
        Log.error("Error occurred when report full data pulling status.", e);
        throw new RuntimeException(e);
    }
}
项目:DBus    文件:PagedBatchDataFetchingBolt.java   
@SuppressWarnings("unchecked")
private void sendMessageToKafka(String key, DbusMessage dbusMessage, AtomicLong sendCnt, AtomicLong recvCnt, AtomicBoolean isError) throws Exception{
    if(stringProducer == null) {
        throw new Exception("producer is null, can't send to kafka!");
    }

    ProducerRecord record = new ProducerRecord<>(resultTopic, key, dbusMessage.toString());
    sendCnt.getAndIncrement();
    stringProducer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null) {
                e.printStackTrace();
                isError.set(true);
            }else{
                recvCnt.getAndIncrement();
            }
        }
    });
}
项目:kafka-visualizer    文件:RestResource.java   
@POST
@Path("/topics/{topicName}")
public Response postTopicData(@PathParam("topicName") String topicName,
                              String data) {
    try {
        ImmutableMap<String, String> params = parseUrlEncodedParams(data);
        String key = params.get("key");
        String value = params.get("value");

        if (key == null || value == null) {
            return responseFactory.createBadRequestResponse("One of the required post params 'key' " +
                    "or 'value' are missing");
        }

        if (!doesTopicExist(topicName)) {
            return responseFactory.createNotFoundResponse(String.format("No topic exists with the name [%s]", topicName));
        }

        RecordMetadata metadata = kafkaProducerWrapper.publish(key, value, topicName);
        return responseFactory.createOkResponse(metadata);
    } catch (Exception e) {
        return responseFactory.createServerErrorResponse(e);
    }
}
项目:wngn-jms-kafka    文件:AsynchronousDeliveryStrategy.java   
@Override
public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event,
                              final FailedDeliveryCallback<E> failedDeliveryCallback) {
    try {
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    failedDeliveryCallback.onFailedDelivery(event, exception);
                }
            }
        });
        return true;
    } catch (BufferExhaustedException e) {
        failedDeliveryCallback.onFailedDelivery(event, e);
        return false;
    }
}
项目:wngn-jms-kafka    文件:LatestProducer.java   
public static void main(String[] args) {

        Map<String, Object> config = new HashMap<String, Object>();
        config.put("partitioner.class", "com.wngn.kafka.SimpleKeyPartition");
        LatestProducer producer = LatestProducer.getInstance(ProducerConstants.TOPIC_KAFKA_TEST, config);
        ProducerRecord<String, String> record = null;
        long index = 0L;
        boolean controller = true;
        while (controller) {
            controller = false;
            index++;
            System.out.println(index + "------------");
            try {
                String message = "message_" + index;
                RecordMetadata recordMetadata = producer.sendWithSync("1", message);
                System.out.format("PARTITION: %d OFFSET: %d\n", recordMetadata.partition(), recordMetadata.offset());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
项目:trellis-rosid    文件:EventProducer.java   
private Consumer<Resource> emitToParent(final IRI identifier, final Dataset dataset,
        final List<Future<RecordMetadata>> results) {
    final Boolean isCreate = dataset.contains(of(PreferAudit), null, type, Create);
    final Boolean isDelete = dataset.contains(of(PreferAudit), null, type, Delete);
    final String containmentTopic = isDelete ? TOPIC_LDP_CONTAINMENT_DELETE : TOPIC_LDP_CONTAINMENT_ADD;
    final String membershipTopic = isDelete ? TOPIC_LDP_MEMBERSHIP_DELETE : TOPIC_LDP_MEMBERSHIP_ADD;

    return container -> {
        if (isDelete || isCreate) {
            try {
                LOGGER.info("Sending to parent: {}", container.getIdentifier());
                results.add(producer.send(buildContainmentMessage(containmentTopic, identifier, container,
                                dataset)));

                buildMembershipMessage(membershipTopic, identifier, container, dataset).ifPresent(msg -> {
                        LOGGER.info("Sending to member resource: {}", container.getMembershipResource());
                        results.add(producer.send(msg));
                });
            } catch (final Exception ex) {
                LOGGER.error("Error processing dataset: {}", ex.getMessage());
            }
        }
    };
}
项目:Building-Data-Streaming-Applications-with-Apache-Kafka    文件:DemoProducer.java   
public static void main(final String[] args) {
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:9092");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("acks", "all");
    producerProps.put("retries", 1);
    producerProps.put("batch.size", 20000);
    producerProps.put("linger.ms", 1);
    producerProps.put("buffer.memory", 24568545);
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps);

    for (int i = 0; i < 2000; i++) {
        ProducerRecord data = new ProducerRecord<String, String>("test1", "Hello this is record " + i);
        Future<RecordMetadata> recordMetadata = producer.send(data);
    }
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:TransactionManagerTest.java   
@Test
public void resendFailedProduceRequestAfterAbortableError() throws Exception {
    final long pid = 13131L;
    final short epoch = 1;
    doInitTransactions(pid, epoch);
    transactionManager.beginTransaction();

    transactionManager.maybeAddPartitionToTransaction(tp0);

    Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
            "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;

    prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
    prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
    sender.run(time.milliseconds()); // AddPartitions
    sender.run(time.milliseconds()); // Produce

    assertFalse(responseFuture.isDone());

    transactionManager.transitionToAbortableError(new KafkaException());
    prepareProduceResponse(Errors.NONE, pid, epoch);

    sender.run(time.milliseconds());
    assertTrue(responseFuture.isDone());
    assertNotNull(responseFuture.get());
}
项目:kafka-0.11.0.0-src-with-comment    文件:SenderTest.java   
@Test
public void testSimple() throws Exception {
    long offset = 0;
    Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
    sender.run(time.milliseconds()); // connect
    sender.run(time.milliseconds()); // send produce request
    assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
    assertTrue(client.hasInFlightRequests());
    client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
    sender.run(time.milliseconds());
    assertEquals("All requests completed.", 0, client.inFlightRequestCount());
    assertFalse(client.hasInFlightRequests());
    sender.run(time.milliseconds());
    assertTrue("Request should be completed", future.isDone());
    assertEquals(offset, future.get().offset());
}
项目:azeroth    文件:DefaultTopicProducer.java   
private boolean doSyncSend(String topicName, String messageKey, DefaultMessage message) {
    try {
        Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, Object>(
            topicName, messageKey, message.isSendBodyOnly() ? message.getBody() : message));
        RecordMetadata metadata = future.get();
        for (ProducerEventHandler handler : eventHanlders) {
            try {
                handler.onSuccessed(topicName, metadata);
            } catch (Exception e) {
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("kafka_send_success,topic=" + topicName + ", messageId=" + messageKey
                      + ", partition=" + metadata.partition() + ", offset="
                      + metadata.offset());
        }
        return true;
    } catch (Exception ex) {
        log.error("kafka_send_fail,topic=" + topicName + ",messageId=" + messageKey, ex);
        //同步发送直接抛异常
        throw new RuntimeException(ex);
    }
}
项目:Lagerta    文件:SynchronousPublisher.java   
@Override
public void handle(long transactionId, Map<String, Collection<Cache.Entry<?, ?>>> updates)
        throws CacheWriterException {
    List<Future<RecordMetadata>> futures = producers
            .get()
            .stream()
            .map(producer -> producer.send(transactionId, updates))
            .collect(Collectors.toList());
    try {
        for (Future<RecordMetadata> future : futures) {
            future.get();
        }
    } catch (InterruptedException | ExecutionException e) {
        throw new CacheWriterException(e);
    }
}
项目:Lagerta    文件:TransactionalKafkaProducerImpl.java   
@SuppressWarnings("unchecked")
@Override
public Future<RecordMetadata> send(long transactionId,
    Map<String, Collection<Cache.Entry<?, ?>>> updates) throws CacheWriterException {
    try {
        int partition = partition(transactionId, partitions);
        TransactionScope key = keyTransformer.apply(transactionId, updates);
        List<List> value = valueTransformer.apply(updates);
        ProducerRecord record = new ProducerRecord(dataTopic, partition, transactionId, serializer.serialize(key),
            serializer.serialize(value));
        return producer.send(record);
    }
    catch (Exception e) {
        throw new CacheWriterException(e);
    }
}
项目:doctorkafka    文件:KafkaWriter.java   
public static void main(String[] args) throws Exception {
  CommandLine commandLine = parseCommandLine(args);
  String zkUrl = commandLine.getOptionValue(ZOOKEEPER);
  String topic = commandLine.getOptionValue(TOPIC);
  int numMessages = Integer.parseInt(commandLine.getOptionValue(NUM_MESSAGES));

  Random random = new Random();
  Properties props = OperatorUtil.createKafkaProducerProperties(zkUrl);
  KafkaProducer kafkaProducer = new KafkaProducer<>(props);

  byte[] key = new byte[16];
  byte[] data = new byte[1024];
  for (int i = 0; i < numMessages; i++) {
    for (int j = 0; j < data.length; j++) {
      data[j] = (byte)random.nextInt();
    }
    ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord(
        topic, 0, System.currentTimeMillis(), key, data);
    Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
    future.get();
    if (i % 100 == 0) {
      System.out.println("Have wrote " + i + " messages to kafka");
    }
  }
}
项目:kafka-0.11.0.0-src-with-comment    文件:SenderTest.java   
@Test
public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
    final long producerId = 343434L;
    TransactionManager transactionManager = new TransactionManager();
    transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
    setupWithTransactionState(transactionManager);
    client.setNode(new Node(1, "localhost", 33343));

    int maxRetries = 10;
    Metrics m = new Metrics();
    Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
            m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);

    Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
    sender.run(time.milliseconds());  // connect.
    sender.run(time.milliseconds());  // send.

    assertEquals(1, client.inFlightRequestCount());

    client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));

    sender.run(time.milliseconds());
    assertTrue(responseFuture.isDone());
    assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
}
项目:Lagerta    文件:SynchronousPublisher.java   
@Override public void writeTransaction(long transactionId, Map<String, Collection<Cache.Entry<?, ?>>> updates)
    throws CacheWriterException {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("[M] Writing synchronous replica for transaction {} with size of batch {}",
                transactionId, updates.size());
    }
    Collection<RemoteKafkaProducer> producers = replicaProducersManager.getProducers();

    if (!producers.isEmpty()) {
        List<Future<RecordMetadata>> futures = new ArrayList<>(producers.size());
        for (RemoteKafkaProducer producer : producers) {
            futures.add(producer.writeTransaction(transactionId, updates));
        }
        wait(futures);
    }
}
项目:jkes    文件:JkesKafkaProducer.java   
public Future<RecordMetadata> send(Object value, Callback callback) {
    DocumentMetadata documentMetadata =
            Metadata.getMetadata().getMetadataMap().get(value.getClass());

    String topic = documentMetadata.getTopic();

    Method method = documentMetadata.getIdMetadata().getMethod();
    try {
        String key = String.valueOf(method.invoke(value));
        return producer.send(new ProducerRecord<>(topic, key, value), callback);
    } catch (IllegalAccessException | InvocationTargetException e) {
        throw new JkesException(
                String.format("Can't invoke method[%s] on object[%s] of class[%s]", method, value, value.getClass()),
                e);
    }
}
项目:Lagerta    文件:ProducerProxyRetry.java   
@Override
public Future<RecordMetadata> send(final ProducerRecord<K, V> record, final Callback callback) {
    return Retries.tryMe(new IgniteClosure<RetryCallableAsyncOnCallback, Future<RecordMetadata>>() {
        @Override
        public Future<RecordMetadata> apply(final RetryCallableAsyncOnCallback retryCallableAsyncOnCallback) {
            return inner.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    callback.onCompletion(metadata, exception);
                    if (exception != null) {
                        retryCallableAsyncOnCallback.retry(exception);
                    }
                }
            });
        }
    });
}
项目:kafka-0.11.0.0-src-with-comment    文件:TransactionManagerTest.java   
@Test(expected = ExecutionException.class)
public void testProducerFencedException() throws InterruptedException, ExecutionException {
    final long pid = 13131L;
    final short epoch = 1;

    doInitTransactions(pid, epoch);

    transactionManager.beginTransaction();
    transactionManager.maybeAddPartitionToTransaction(tp0);

    Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
            "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;

    assertFalse(responseFuture.isDone());
    prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
    prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, pid, epoch);
    sender.run(time.milliseconds()); // Add partitions.

    sender.run(time.milliseconds());  // send produce.

    assertTrue(responseFuture.isDone());
    assertTrue(transactionManager.hasError());
    responseFuture.get();
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
    final AtomicInteger attempt = new AtomicInteger(0);
    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    if (attempt.getAndIncrement() == 0) {
                        throw new TimeoutException();
                    }
                    return super.send(record, callback);
                }
            },
            "test");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
    assertEquals(Long.valueOf(0L), offset);
}
项目:kafka-0.11.0.0-src-with-comment    文件:VerifiableProducer.java   
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    synchronized (System.out) {
        if (e == null) {
            VerifiableProducer.this.numAcked++;
            System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis()));
        } else {
            System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis()));
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaLog4jAppender.java   
@Override
protected void append(LoggingEvent event) {
    String message = subAppend(event);
    LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
    Future<RecordMetadata> response = producer.send(
        new ProducerRecord<byte[], byte[]>(topic, message.getBytes(StandardCharsets.UTF_8)));
    if (syncSend) {
        try {
            response.get();
        } catch (InterruptedException | ExecutionException ex) {
            throw new RuntimeException(ex);
        }
    }
}
项目:kmq    文件:KmqClient.java   
/**
 * @param record The message for which should be acknowledged as processed; an end marker will be send to the
 *               markers topic.
 * @return Result of the marker send. Usually can be ignored, we don't need a guarantee the marker has been sent,
 * worst case the message will be reprocessed.
 */
public Future<RecordMetadata> processed(ConsumerRecord<K, V> record) {
    // 5. writing an "end" marker. No need to wait for confirmation that it has been sent. It would be
    // nice, though, not to ignore that output completely.
    return markerProducer.send(new ProducerRecord<>(config.getMarkerTopic(),
            MarkerKey.fromRecord(record),
            EndMarker.INSTANCE));
}
项目:kafka-0.11.0.0-src-with-comment    文件:SenderTest.java   
@Test
public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
    final long producerId = 343434L;
    TransactionManager transactionManager = new TransactionManager();
    transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
    setupWithTransactionState(transactionManager);
    client.setNode(new Node(1, "localhost", 33343));

    int maxRetries = 10;
    Metrics m = new Metrics();
    Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
            m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);

    Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
    sender.run(time.milliseconds());  // connect.
    sender.run(time.milliseconds());  // send.
    String id = client.requests().peek().destination();
    Node node = new Node(Integer.valueOf(id), "localhost", 0);
    assertEquals(1, client.inFlightRequestCount());
    assertTrue("Client ready status should be true", client.isReady(node, 0L));
    client.disconnect(id);
    assertEquals(0, client.inFlightRequestCount());
    assertFalse("Client ready status should be false", client.isReady(node, 0L));

    transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
    sender.run(time.milliseconds()); // receive error
    sender.run(time.milliseconds()); // reconnect
    sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
    assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());

    KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, ""));
    assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);

    assertTrue(responseFuture.isDone());
    assertEquals((long) transactionManager.sequenceNumber(tp0), 0L);
}
项目:kafka-0.11.0.0-src-with-comment    文件:IntegrationTestUtils.java   
public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                     final Collection<KeyValue<K, V>> records,
                                                                     final Properties producerConfig,
                                                                     final Long timestamp)
    throws ExecutionException, InterruptedException {
    try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
        for (final KeyValue<K, V> record : records) {
            final Future<RecordMetadata> f = producer.send(
                new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
            f.get();
        }
        producer.flush();
    }
}
项目:DBus    文件:ControlMessageSender.java   
public void send(String topic, ControlMessage msg) throws Exception {
    String key = msg.getType();
    String jsonMessage = msg.toJSONString();
    byte[] message = jsonMessage.getBytes();

    try {
        Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, message), null);
        result.get();
    } finally {
        producer.close();
    }
}
项目:video-stream-analytics    文件:VideoEventGenerator.java   
@Override
public void onCompletion(RecordMetadata rm, Exception e) {
    if (rm != null) {
        logger.info("cameraId="+ camId + " partition=" + rm.partition());
    }
    if (e != null) {
        e.printStackTrace();
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:SenderTest.java   
@Test
public void testSequenceNumberIncrement() throws InterruptedException {
    final long producerId = 343434L;
    TransactionManager transactionManager = new TransactionManager();
    transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
    setupWithTransactionState(transactionManager);
    client.setNode(new Node(1, "localhost", 33343));

    int maxRetries = 10;
    Metrics m = new Metrics();
    Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
            m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);

    Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
    client.prepareResponse(new MockClient.RequestMatcher() {
        @Override
        public boolean matches(AbstractRequest body) {
            if (body instanceof ProduceRequest) {
                ProduceRequest request = (ProduceRequest) body;
                MemoryRecords records = request.partitionRecordsOrFail().get(tp0);
                Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
                assertTrue(batchIterator.hasNext());
                RecordBatch batch = batchIterator.next();
                assertFalse(batchIterator.hasNext());
                assertEquals(0, batch.baseSequence());
                assertEquals(producerId, batch.producerId());
                assertEquals(0, batch.producerEpoch());
                return true;
            }
            return false;
        }
    }, produceResponse(tp0, 0, Errors.NONE, 0));

    sender.run(time.milliseconds());  // connect.
    sender.run(time.milliseconds());  // send.

    sender.run(time.milliseconds());  // receive response
    assertTrue(responseFuture.isDone());
    assertEquals((long) transactionManager.sequenceNumber(tp0), 1L);
}
项目:kafka-0.11.0.0-src-with-comment    文件:ErrorLoggingCallback.java   
public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null) {
        String keyString = (key == null) ? "null" :
                logAsString ? new String(key, StandardCharsets.UTF_8) : key.length + " bytes";
        String valueString = (valueLength == -1) ? "null" :
                logAsString ? new String(value, StandardCharsets.UTF_8) : valueLength + " bytes";
        log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
                topic, keyString, valueString, e);
    }
}
项目:ja-micro    文件:KafkaPublisherTest.java   
@Test
public void sendFailsReturnsFalse() {
    KafkaProducer producer = mock(KafkaProducer.class);
    publisher.realProducer = producer;
    RecordMetadata metadata = new RecordMetadata(null, 0, 0,
            0, Long.valueOf(0), 0, 0);
    ArgumentCaptor<Callback> captor = ArgumentCaptor.forClass(Callback.class);
    when(producer.send(any(), captor.capture())).then(
        invocation -> {
            captor.getValue().onCompletion(metadata, new TimeoutException("error"));
            return new CompletableFuture();
        });
    String[] events = { "test" };
    assertThat(publisher.publishEvents(false, null, events)).isFalse();
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.close();
}
项目:kafka-junit    文件:ProducedKafkaRecord.java   
/**
 * Utility factory.
 * @param recordMetadata Metadata about the produced record.
 * @param producerRecord The original record that was produced.
 * @param <K> Type of key
 * @param <V> Type of message
 * @return A ProducedKafkaRecord that represents metadata about the original record, and the results of it being published.
 */
static <K,V> ProducedKafkaRecord<K,V> newInstance(
    final RecordMetadata recordMetadata,
    final ProducerRecord<K,V> producerRecord) {
    return new ProducedKafkaRecord<>(
        recordMetadata.topic(),
        recordMetadata.partition(),
        recordMetadata.offset(),
        producerRecord.key(),
        producerRecord.value()
    );
}
项目:flume-release-1.7.0    文件:KafkaChannel.java   
public void onCompletion(RecordMetadata metadata, Exception exception) {
  if (exception != null) {
    log.trace("Error sending message to Kafka due to " + exception.getMessage());
  }
  if (log.isDebugEnabled()) {
    long batchElapsedTime = System.currentTimeMillis() - startTime;
    log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" +
              metadata.partition() + "-" + metadata.offset() + "-" + batchElapsedTime);
  }
}
项目:kafka-streams-machine-learning-examples    文件:IntegrationTestUtils.java   
/**
 * @param topic          Kafka topic to write the data records to
 * @param records        Data records to write to Kafka
 * @param producerConfig Kafka producer configuration
 * @param <K>            Key type of the data records
 * @param <V>            Value type of the data records
 */
public static <K, V> void produceKeyValuesSynchronously(
    String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
    throws ExecutionException, InterruptedException {
  Producer<K, V> producer = new KafkaProducer<>(producerConfig);
  for (KeyValue<K, V> record : records) {
    Future<RecordMetadata> f = producer.send(
        new ProducerRecord<>(topic, record.key, record.value));
    f.get();
  }
  producer.flush();
  producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:TransactionManagerTest.java   
@Test
public void testAllowAbortOnProduceFailure() throws InterruptedException {
    final long pid = 13131L;
    final short epoch = 1;

    doInitTransactions(pid, epoch);

    transactionManager.beginTransaction();
    transactionManager.maybeAddPartitionToTransaction(tp0);

    Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
            "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;

    assertFalse(responseFuture.isDone());
    prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
    prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
    prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);

    sender.run(time.milliseconds());  // Send AddPartitionsRequest
    sender.run(time.milliseconds());  // Send Produce Request, returns OutOfOrderSequenceException.

    TransactionalRequestResult abortResult = transactionManager.beginAbort();
    sender.run(time.milliseconds());  // try to abort
    assertTrue(abortResult.isCompleted());
    assertTrue(abortResult.isSuccessful());
    assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
}