@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { /* // Create wrappedRecord because headers can be read only in record (if record is sent second time) ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), record.value(), record.headers()); */ try (Scope scope = buildAndInjectSpan(record)) { Callback wrappedCallback = new TracingCallback(callback, scope); return producer.send(record, wrappedCallback); } }
@Test public void testBatchCannotCompleteTwice() throws Exception { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); batch.done(500L, 10L, null); try { batch.done(1000L, 20L, null); fail("Expected exception from done"); } catch (IllegalStateException e) { // expected } RecordMetadata recordMetadata = future.get(); assertEquals(500L, recordMetadata.offset()); assertEquals(10L, recordMetadata.timestamp()); }
public void publish(BrokerStats brokerStats) throws IOException { try { ByteArrayOutputStream stream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null); avroEventWriter.write(brokerStats, binaryEncoder); binaryEncoder.flush(); IOUtils.closeQuietly(stream); String key = brokerStats.getName() + "_" + System.currentTimeMillis(); int numPartitions = kafkaProducer.partitionsFor(destTopic).size(); int partition = brokerStats.getId() % numPartitions; Future<RecordMetadata> future = kafkaProducer.send( new ProducerRecord(destTopic, partition, key.getBytes(), stream.toByteArray())); future.get(); OpenTsdbMetricConverter.incr("kafka.stats.collector.success", 1, "host=" + HOSTNAME); } catch (Exception e) { LOG.error("Failure in publish stats", e); OpenTsdbMetricConverter.incr("kafka.stats.collector.failure", 1, "host=" + HOSTNAME); throw new RuntimeException("Avro serialization failure", e); } }
@Override public void send(Long k, byte[] v) { KafkaProducer<Long, byte[]> p = getWorker(); p.initTransactions(); p.beginTransaction(); Future<RecordMetadata> res = worker.send(new ProducerRecord<Long, byte[]>(topic, k, v)); RecordMetadata record; try { record = res.get(); offsets.clear(); offsets.put(new TopicPartition(topic, record.partition()), new OffsetAndMetadata(record.offset())); p.sendOffsetsToTransaction(offsets, MallConstants.ORDER_GROUP); p.commitTransaction(); } catch (InterruptedException | ExecutionException e) { p.abortTransaction(); } }
/** * Emit messages to the relevant kafka topics * @return true if the messages were successfully delivered to the kafka topics; false otherwise */ public Boolean emit() { try { final List<Future<RecordMetadata>> results = new ArrayList<>(); if (async) { results.add(producer.send(new ProducerRecord<>(TOPIC_CACHE, identifier.getIRIString(), serialize(dataset)))); } // Update the containment triples of the parent resource if this is a delete or create operation parent.ifPresent(emitToParent(identifier, dataset, results)); for (final Future<RecordMetadata> result : results) { final RecordMetadata res = result.get(); LOGGER.debug("Send record to topic: {}, {}", res, res.timestamp()); } return true; } catch (final InterruptedException | ExecutionException ex) { LOGGER.error("Error sending record to kafka topic: {}", ex.getMessage()); return false; } }
public void sendMessage(MetaVersion ver, MetaWrapper newMeta, MetaCompareResult result) { ControlMessage message = new ControlMessage(System.currentTimeMillis(), ControlType.G_META_SYNC_WARNING.toString(), "dbus-appender"); message.addPayload("datasource", GlobalCache.getDatasource().getDsName()); message.addPayload("schema", ver.getSchema()); message.addPayload("tableId", ver.getTableId()); message.addPayload("table", ver.getTable()); message.addPayload("before", ver.getMeta()); message.addPayload("after", newMeta); message.addPayload("compare-result", JSON.toJSON(result)); message.addPayload("version", ver.getVersion()); String topic = PropertiesHolder.getProperties(Constants.Properties.CONFIGURE, Constants.ConfigureKey.GLOBAL_EVENT_TOPIC); ProducerRecord<String, String> record = new ProducerRecord<>(topic, message.getType(), message.toJSONString()); Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> { if (exception != null) { logger.error("Send global event error.{}", exception.getMessage()); } }); try { future.get(10000, TimeUnit.MILLISECONDS); } catch (Exception e) { logger.error(e.getMessage(), e); } }
private static void sendAckInfoToCtrlTopic(String dataSourceInfo, String completedTime, String pullStatus) { try { // 在源dataSourceInfo的基础上,更新全量拉取相关信息。然后发回src topic JSONObject jsonObj = JSONObject.parseObject(dataSourceInfo); jsonObj.put(DataPullConstants.FullPullInterfaceJson.FROM_KEY, DataPullConstants.FullPullInterfaceJson.FROM_VALUE); jsonObj.put(DataPullConstants.FullPullInterfaceJson.TYPE_KEY, DataPullConstants.FullPullInterfaceJson.TYPE_VALUE); // notifyFullPullRequestor JSONObject payloadObj = jsonObj.getJSONObject(DataPullConstants.FullPullInterfaceJson.PAYLOAD_KEY); // 完成时间 payloadObj.put(DataPullConstants.FullPullInterfaceJson.COMPLETE_TIME_KEY, completedTime); // 拉取是否成功标志位 payloadObj.put(DataPullConstants.FullPullInterfaceJson.DATA_STATUS_KEY, pullStatus); jsonObj.put(DataPullConstants.FullPullInterfaceJson.PAYLOAD_KEY, payloadObj); String ctrlTopic = getFullPullProperties(Constants.ZkTopoConfForFullPull.COMMON_CONFIG, true) .getProperty(Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC); Producer producer = DbusHelper .getProducer(getFullPullProperties(Constants.ZkTopoConfForFullPull.BYTE_PRODUCER_CONFIG, true)); ProducerRecord record = new ProducerRecord<>(ctrlTopic, DataPullConstants.FullPullInterfaceJson.TYPE_VALUE, jsonObj.toString().getBytes()); Future<RecordMetadata> future = producer.send(record); RecordMetadata meta = future.get(); } catch (Exception e) { Log.error("Error occurred when report full data pulling status.", e); throw new RuntimeException(e); } }
@SuppressWarnings("unchecked") private void sendMessageToKafka(String key, DbusMessage dbusMessage, AtomicLong sendCnt, AtomicLong recvCnt, AtomicBoolean isError) throws Exception{ if(stringProducer == null) { throw new Exception("producer is null, can't send to kafka!"); } ProducerRecord record = new ProducerRecord<>(resultTopic, key, dbusMessage.toString()); sendCnt.getAndIncrement(); stringProducer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); isError.set(true); }else{ recvCnt.getAndIncrement(); } } }); }
@POST @Path("/topics/{topicName}") public Response postTopicData(@PathParam("topicName") String topicName, String data) { try { ImmutableMap<String, String> params = parseUrlEncodedParams(data); String key = params.get("key"); String value = params.get("value"); if (key == null || value == null) { return responseFactory.createBadRequestResponse("One of the required post params 'key' " + "or 'value' are missing"); } if (!doesTopicExist(topicName)) { return responseFactory.createNotFoundResponse(String.format("No topic exists with the name [%s]", topicName)); } RecordMetadata metadata = kafkaProducerWrapper.publish(key, value, topicName); return responseFactory.createOkResponse(metadata); } catch (Exception e) { return responseFactory.createServerErrorResponse(e); } }
@Override public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event, final FailedDeliveryCallback<E> failedDeliveryCallback) { try { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { failedDeliveryCallback.onFailedDelivery(event, exception); } } }); return true; } catch (BufferExhaustedException e) { failedDeliveryCallback.onFailedDelivery(event, e); return false; } }
public static void main(String[] args) { Map<String, Object> config = new HashMap<String, Object>(); config.put("partitioner.class", "com.wngn.kafka.SimpleKeyPartition"); LatestProducer producer = LatestProducer.getInstance(ProducerConstants.TOPIC_KAFKA_TEST, config); ProducerRecord<String, String> record = null; long index = 0L; boolean controller = true; while (controller) { controller = false; index++; System.out.println(index + "------------"); try { String message = "message_" + index; RecordMetadata recordMetadata = producer.sendWithSync("1", message); System.out.format("PARTITION: %d OFFSET: %d\n", recordMetadata.partition(), recordMetadata.offset()); } catch (Exception e) { e.printStackTrace(); } } producer.close(); }
private Consumer<Resource> emitToParent(final IRI identifier, final Dataset dataset, final List<Future<RecordMetadata>> results) { final Boolean isCreate = dataset.contains(of(PreferAudit), null, type, Create); final Boolean isDelete = dataset.contains(of(PreferAudit), null, type, Delete); final String containmentTopic = isDelete ? TOPIC_LDP_CONTAINMENT_DELETE : TOPIC_LDP_CONTAINMENT_ADD; final String membershipTopic = isDelete ? TOPIC_LDP_MEMBERSHIP_DELETE : TOPIC_LDP_MEMBERSHIP_ADD; return container -> { if (isDelete || isCreate) { try { LOGGER.info("Sending to parent: {}", container.getIdentifier()); results.add(producer.send(buildContainmentMessage(containmentTopic, identifier, container, dataset))); buildMembershipMessage(membershipTopic, identifier, container, dataset).ifPresent(msg -> { LOGGER.info("Sending to member resource: {}", container.getMembershipResource()); results.add(producer.send(msg)); }); } catch (final Exception ex) { LOGGER.error("Error processing dataset: {}", ex.getMessage()); } } }; }
public static void main(final String[] args) { Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("acks", "all"); producerProps.put("retries", 1); producerProps.put("batch.size", 20000); producerProps.put("linger.ms", 1); producerProps.put("buffer.memory", 24568545); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps); for (int i = 0; i < 2000; i++) { ProducerRecord data = new ProducerRecord<String, String>("test1", "Hello this is record " + i); Future<RecordMetadata> recordMetadata = producer.send(data); } producer.close(); }
@Test public void resendFailedProduceRequestAfterAbortableError() throws Exception { final long pid = 13131L; final short epoch = 1; doInitTransactions(pid, epoch); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch); sender.run(time.milliseconds()); // AddPartitions sender.run(time.milliseconds()); // Produce assertFalse(responseFuture.isDone()); transactionManager.transitionToAbortableError(new KafkaException()); prepareProduceResponse(Errors.NONE, pid, epoch); sender.run(time.milliseconds()); assertTrue(responseFuture.isDone()); assertNotNull(responseFuture.get()); }
@Test public void testSimple() throws Exception { long offset = 0; Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); assertTrue(client.hasInFlightRequests()); client.respond(produceResponse(tp0, offset, Errors.NONE, 0)); sender.run(time.milliseconds()); assertEquals("All requests completed.", 0, client.inFlightRequestCount()); assertFalse(client.hasInFlightRequests()); sender.run(time.milliseconds()); assertTrue("Request should be completed", future.isDone()); assertEquals(offset, future.get().offset()); }
private boolean doSyncSend(String topicName, String messageKey, DefaultMessage message) { try { Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, Object>( topicName, messageKey, message.isSendBodyOnly() ? message.getBody() : message)); RecordMetadata metadata = future.get(); for (ProducerEventHandler handler : eventHanlders) { try { handler.onSuccessed(topicName, metadata); } catch (Exception e) { } } if (log.isDebugEnabled()) { log.debug("kafka_send_success,topic=" + topicName + ", messageId=" + messageKey + ", partition=" + metadata.partition() + ", offset=" + metadata.offset()); } return true; } catch (Exception ex) { log.error("kafka_send_fail,topic=" + topicName + ",messageId=" + messageKey, ex); //同步发送直接抛异常 throw new RuntimeException(ex); } }
@Override public void handle(long transactionId, Map<String, Collection<Cache.Entry<?, ?>>> updates) throws CacheWriterException { List<Future<RecordMetadata>> futures = producers .get() .stream() .map(producer -> producer.send(transactionId, updates)) .collect(Collectors.toList()); try { for (Future<RecordMetadata> future : futures) { future.get(); } } catch (InterruptedException | ExecutionException e) { throw new CacheWriterException(e); } }
@SuppressWarnings("unchecked") @Override public Future<RecordMetadata> send(long transactionId, Map<String, Collection<Cache.Entry<?, ?>>> updates) throws CacheWriterException { try { int partition = partition(transactionId, partitions); TransactionScope key = keyTransformer.apply(transactionId, updates); List<List> value = valueTransformer.apply(updates); ProducerRecord record = new ProducerRecord(dataTopic, partition, transactionId, serializer.serialize(key), serializer.serialize(value)); return producer.send(record); } catch (Exception e) { throw new CacheWriterException(e); } }
public static void main(String[] args) throws Exception { CommandLine commandLine = parseCommandLine(args); String zkUrl = commandLine.getOptionValue(ZOOKEEPER); String topic = commandLine.getOptionValue(TOPIC); int numMessages = Integer.parseInt(commandLine.getOptionValue(NUM_MESSAGES)); Random random = new Random(); Properties props = OperatorUtil.createKafkaProducerProperties(zkUrl); KafkaProducer kafkaProducer = new KafkaProducer<>(props); byte[] key = new byte[16]; byte[] data = new byte[1024]; for (int i = 0; i < numMessages; i++) { for (int j = 0; j < data.length; j++) { data[j] = (byte)random.nextInt(); } ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord( topic, 0, System.currentTimeMillis(), key, data); Future<RecordMetadata> future = kafkaProducer.send(producerRecord); future.get(); if (i % 100 == 0) { System.out.println("Have wrote " + i + " messages to kafka"); } } }
@Test public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0)); setupWithTransactionState(transactionManager); client.setNode(new Node(1, "localhost", 33343)); int maxRetries = 10; Metrics m = new Metrics(); Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect. sender.run(time.milliseconds()); // send. assertEquals(1, client.inFlightRequestCount()); client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0)); sender.run(time.milliseconds()); assertTrue(responseFuture.isDone()); assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId()); }
@Override public void writeTransaction(long transactionId, Map<String, Collection<Cache.Entry<?, ?>>> updates) throws CacheWriterException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[M] Writing synchronous replica for transaction {} with size of batch {}", transactionId, updates.size()); } Collection<RemoteKafkaProducer> producers = replicaProducersManager.getProducers(); if (!producers.isEmpty()) { List<Future<RecordMetadata>> futures = new ArrayList<>(producers.size()); for (RemoteKafkaProducer producer : producers) { futures.add(producer.writeTransaction(transactionId, updates)); } wait(futures); } }
public Future<RecordMetadata> send(Object value, Callback callback) { DocumentMetadata documentMetadata = Metadata.getMetadata().getMetadataMap().get(value.getClass()); String topic = documentMetadata.getTopic(); Method method = documentMetadata.getIdMetadata().getMethod(); try { String key = String.valueOf(method.invoke(value)); return producer.send(new ProducerRecord<>(topic, key, value), callback); } catch (IllegalAccessException | InvocationTargetException e) { throw new JkesException( String.format("Can't invoke method[%s] on object[%s] of class[%s]", method, value, value.getClass()), e); } }
@Override public Future<RecordMetadata> send(final ProducerRecord<K, V> record, final Callback callback) { return Retries.tryMe(new IgniteClosure<RetryCallableAsyncOnCallback, Future<RecordMetadata>>() { @Override public Future<RecordMetadata> apply(final RetryCallableAsyncOnCallback retryCallableAsyncOnCallback) { return inner.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { callback.onCompletion(metadata, exception); if (exception != null) { retryCallableAsyncOnCallback.retry(exception); } } }); } }); }
@Test(expected = ExecutionException.class) public void testProducerFencedException() throws InterruptedException, ExecutionException { final long pid = 13131L; final short epoch = 1; doInitTransactions(pid, epoch); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, pid, epoch); sender.run(time.milliseconds()); // Add partitions. sender.run(time.milliseconds()); // send produce. assertTrue(responseFuture.isDone()); assertTrue(transactionManager.hasError()); responseFuture.get(); }
@SuppressWarnings("unchecked") @Test public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception { final AtomicInteger attempt = new AtomicInteger(0); final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { if (attempt.getAndIncrement() == 0) { throw new TimeoutException(); } return super.send(record, callback); } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); final Long offset = collector.offsets().get(new TopicPartition("topic1", 0)); assertEquals(Long.valueOf(0L), offset); }
public void onCompletion(RecordMetadata recordMetadata, Exception e) { synchronized (System.out) { if (e == null) { VerifiableProducer.this.numAcked++; System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis())); } else { System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis())); } } }
@Override protected void append(LoggingEvent event) { String message = subAppend(event); LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message); Future<RecordMetadata> response = producer.send( new ProducerRecord<byte[], byte[]>(topic, message.getBytes(StandardCharsets.UTF_8))); if (syncSend) { try { response.get(); } catch (InterruptedException | ExecutionException ex) { throw new RuntimeException(ex); } } }
/** * @param record The message for which should be acknowledged as processed; an end marker will be send to the * markers topic. * @return Result of the marker send. Usually can be ignored, we don't need a guarantee the marker has been sent, * worst case the message will be reprocessed. */ public Future<RecordMetadata> processed(ConsumerRecord<K, V> record) { // 5. writing an "end" marker. No need to wait for confirmation that it has been sent. It would be // nice, though, not to ignore that output completely. return markerProducer.send(new ProducerRecord<>(config.getMarkerTopic(), MarkerKey.fromRecord(record), EndMarker.INSTANCE)); }
@Test public void testAbortRetryWhenProducerIdChanges() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0)); setupWithTransactionState(transactionManager); client.setNode(new Node(1, "localhost", 33343)); int maxRetries = 10; Metrics m = new Metrics(); Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect. sender.run(time.milliseconds()); // send. String id = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(id), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); client.disconnect(id); assertEquals(0, client.inFlightRequestCount()); assertFalse("Client ready status should be false", client.isReady(node, 0L)); transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0)); sender.run(time.milliseconds()); // receive error sender.run(time.milliseconds()); // reconnect sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors. assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount()); KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, "")); assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0); assertTrue(responseFuture.isDone()); assertEquals((long) transactionManager.sequenceNumber(tp0), 0L); }
public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Long timestamp) throws ExecutionException, InterruptedException { try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) { for (final KeyValue<K, V> record : records) { final Future<RecordMetadata> f = producer.send( new ProducerRecord<>(topic, null, timestamp, record.key, record.value)); f.get(); } producer.flush(); } }
public void send(String topic, ControlMessage msg) throws Exception { String key = msg.getType(); String jsonMessage = msg.toJSONString(); byte[] message = jsonMessage.getBytes(); try { Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, message), null); result.get(); } finally { producer.close(); } }
@Override public void onCompletion(RecordMetadata rm, Exception e) { if (rm != null) { logger.info("cameraId="+ camId + " partition=" + rm.partition()); } if (e != null) { e.printStackTrace(); } }
@Test public void testSequenceNumberIncrement() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0)); setupWithTransactionState(transactionManager); client.setNode(new Node(1, "localhost", 33343)); int maxRetries = 10; Metrics m = new Metrics(); Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { if (body instanceof ProduceRequest) { ProduceRequest request = (ProduceRequest) body; MemoryRecords records = request.partitionRecordsOrFail().get(tp0); Iterator<MutableRecordBatch> batchIterator = records.batches().iterator(); assertTrue(batchIterator.hasNext()); RecordBatch batch = batchIterator.next(); assertFalse(batchIterator.hasNext()); assertEquals(0, batch.baseSequence()); assertEquals(producerId, batch.producerId()); assertEquals(0, batch.producerEpoch()); return true; } return false; } }, produceResponse(tp0, 0, Errors.NONE, 0)); sender.run(time.milliseconds()); // connect. sender.run(time.milliseconds()); // send. sender.run(time.milliseconds()); // receive response assertTrue(responseFuture.isDone()); assertEquals((long) transactionManager.sequenceNumber(tp0), 1L); }
public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { String keyString = (key == null) ? "null" : logAsString ? new String(key, StandardCharsets.UTF_8) : key.length + " bytes"; String valueString = (valueLength == -1) ? "null" : logAsString ? new String(value, StandardCharsets.UTF_8) : valueLength + " bytes"; log.error("Error when sending message to topic {} with key: {}, value: {} with error:", topic, keyString, valueString, e); } }
@Test public void sendFailsReturnsFalse() { KafkaProducer producer = mock(KafkaProducer.class); publisher.realProducer = producer; RecordMetadata metadata = new RecordMetadata(null, 0, 0, 0, Long.valueOf(0), 0, 0); ArgumentCaptor<Callback> captor = ArgumentCaptor.forClass(Callback.class); when(producer.send(any(), captor.capture())).then( invocation -> { captor.getValue().onCompletion(metadata, new TimeoutException("error")); return new CompletableFuture(); }); String[] events = { "test" }; assertThat(publisher.publishEvents(false, null, events)).isFalse(); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { callback.onCompletion(null, new Exception()); return null; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.close(); }
/** * Utility factory. * @param recordMetadata Metadata about the produced record. * @param producerRecord The original record that was produced. * @param <K> Type of key * @param <V> Type of message * @return A ProducedKafkaRecord that represents metadata about the original record, and the results of it being published. */ static <K,V> ProducedKafkaRecord<K,V> newInstance( final RecordMetadata recordMetadata, final ProducerRecord<K,V> producerRecord) { return new ProducedKafkaRecord<>( recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), producerRecord.key(), producerRecord.value() ); }
public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { log.trace("Error sending message to Kafka due to " + exception.getMessage()); } if (log.isDebugEnabled()) { long batchElapsedTime = System.currentTimeMillis() - startTime; log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset() + "-" + batchElapsedTime); } }
/** * @param topic Kafka topic to write the data records to * @param records Data records to write to Kafka * @param producerConfig Kafka producer configuration * @param <K> Key type of the data records * @param <V> Value type of the data records */ public static <K, V> void produceKeyValuesSynchronously( String topic, Collection<KeyValue<K, V>> records, Properties producerConfig) throws ExecutionException, InterruptedException { Producer<K, V> producer = new KafkaProducer<>(producerConfig); for (KeyValue<K, V> record : records) { Future<RecordMetadata> f = producer.send( new ProducerRecord<>(topic, record.key, record.value)); f.get(); } producer.flush(); producer.close(); }
@Test public void testAllowAbortOnProduceFailure() throws InterruptedException { final long pid = 13131L; final short epoch = 1; doInitTransactions(pid, epoch); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch); prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch); sender.run(time.milliseconds()); // Send AddPartitionsRequest sender.run(time.milliseconds()); // Send Produce Request, returns OutOfOrderSequenceException. TransactionalRequestResult abortResult = transactionManager.beginAbort(); sender.run(time.milliseconds()); // try to abort assertTrue(abortResult.isCompleted()); assertTrue(abortResult.isSuccessful()); assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. }