Java 类org.apache.kafka.clients.producer.Callback 实例源码

项目:DBus    文件:KafkaProducerBolt.java   
private void sendDataToKafka(long batchId, byte[] data, Tuple input) {
    @SuppressWarnings("rawtypes")
    ProducerRecord record = new ProducerRecord<>(outputTopic, "", data);

    producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            synchronized (collector) {
            if (e != null) {
                collector.fail(input);
                logger.error("kafka ack failed to the message which batchId is " + batchId, e);
            } else {
                collector.ack(input);
                logger.debug("kafka ack to the message which batchId is " + batchId, e);
            }
        }}
    });
}
项目:java-kafka-client    文件:TracingKafkaProducer.java   
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  /*
  // Create wrappedRecord because headers can be read only in record (if record is sent second time)
  ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(),
      record.partition(),
      record.timestamp(),
      record.key(),
      record.value(),
      record.headers());
  */

  try (Scope scope = buildAndInjectSpan(record)) {
    Callback wrappedCallback = new TracingCallback(callback, scope);
    return producer.send(record, wrappedCallback);
  }
}
项目:java-kafka-client    文件:TracingKafkaTest.java   
@Test
public void test() throws Exception {
  Producer<Integer, String> producer = createProducer();

  // Send 1
  producer.send(new ProducerRecord<>("messages", 1, "test"));

  // Send 2
  producer.send(new ProducerRecord<>("messages", 1, "test"), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      assertEquals("messages", metadata.topic());
    }
  });

  final CountDownLatch latch = new CountDownLatch(2);
  createConsumer(latch, 1);

  producer.close();

  List<MockSpan> mockSpans = mockTracer.finishedSpans();
  assertEquals(4, mockSpans.size());
  checkSpans(mockSpans);
  assertNull(mockTracer.activeSpan());
}
项目:cruise-control    文件:CruiseControlMetricsReporter.java   
/**
 * Send a CruiseControlMetric to the Kafka topic.
 * @param ccm the Cruise Control metric to send.
 */
public void sendCruiseControlMetric(CruiseControlMetric ccm) {
  // Use topic name as key if existing so that the same sampler will be able to collect all the information
  // of a topic.
  String key = ccm.metricClassId() == CruiseControlMetric.MetricClassId.TOPIC_METRIC ?
      ((TopicMetric) ccm).topic() : Integer.toString(ccm.brokerId());
  ProducerRecord<String, CruiseControlMetric> producerRecord =
      new ProducerRecord<>(_cruiseControlMetricsTopic, null, ccm.time(), key, ccm);
  LOG.debug("Sending Cruise Control metric {}.", ccm);
  _producer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      if (e != null) {
        LOG.warn("Failed to send Cruise Control metric {}", ccm);
        _numMetricSendFailure++;
      }
    }
  });
}
项目:cruise-control    文件:CruiseControlMetricsReporterTest.java   
@Before
public void setUp() {
  super.setUp();
  Properties props = new Properties();
  props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
  AtomicInteger failed = new AtomicInteger(0);
  try (Producer<String, String> producer = createProducer(props)) {
    for (int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<>("TestTopic", Integer.toString(i)), new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          if (e != null) {
            failed.incrementAndGet();
          }
        }
      });
    }
  }
  assertEquals(0, failed.get());
}
项目:DBus    文件:MessageProcessor.java   
/***
 * send stat info to statistic topic, do not care about success or not.
 * @param message
 */
private void sendTableStatInfo(StatMessage message) {

    String key = String.format("%s.%s.%s.%s.%s", message.getDsName(), message.getSchemaName(), message.getTableName(),
            message.getType(), message.getTxTimeMS());
    String value = message.toJSONString();

    Callback callback = new Callback() {
        @Override
        public void onCompletion(RecordMetadata ignored, Exception e) {
            if (e != null) {
                logger.error(String.format("Send statistic FAIL: toTopic=%s, key=%s", statTopic, key));
            } else {
                logger.info(String.format("  Send statistic successful: toTopic=%s, key=(%s)", statTopic, key));
            }
        }
    };

    Future<RecordMetadata> result = producer.send(new ProducerRecord<>(statTopic, key, value), callback);
}
项目:DBus    文件:PagedBatchDataFetchingBolt.java   
@SuppressWarnings("unchecked")
private void sendMessageToKafka(String key, DbusMessage dbusMessage, AtomicLong sendCnt, AtomicLong recvCnt, AtomicBoolean isError) throws Exception{
    if(stringProducer == null) {
        throw new Exception("producer is null, can't send to kafka!");
    }

    ProducerRecord record = new ProducerRecord<>(resultTopic, key, dbusMessage.toString());
    sendCnt.getAndIncrement();
    stringProducer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null) {
                e.printStackTrace();
                isError.set(true);
            }else{
                recvCnt.getAndIncrement();
            }
        }
    });
}
项目:kafka-javaee-concurrency-utilities    文件:Producer.java   
@Override
public void run() {
    System.out.println("Producing to topic " + topic);
    String numPartitions  = System.getenv().getOrDefault("NUM_PARTITIONS", "1");
    System.out.println("Total Partitions " + numPartitions);

    while (true) {
        try {
            producer.send(new ProducerRecord<>(topic, "key-" + rnd.nextInt(10), "val-" + rnd.nextInt(10)),
            new Callback() {
                @Override
                public void onCompletion(RecordMetadata record, Exception excptn) {
                    System.out.println("Sent data to Offset " + record.offset()
                            + " in Partition " + record.partition());
                }
            });

            Thread.sleep(Long.valueOf(producerPause));
        } catch (Exception ex) {
            Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

}
项目:wngn-jms-kafka    文件:AsynchronousDeliveryStrategy.java   
@Override
public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event,
                              final FailedDeliveryCallback<E> failedDeliveryCallback) {
    try {
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    failedDeliveryCallback.onFailedDelivery(event, exception);
                }
            }
        });
        return true;
    } catch (BufferExhaustedException e) {
        failedDeliveryCallback.onFailedDelivery(event, e);
        return false;
    }
}
项目:Lagerta    文件:ProducerProxyRetry.java   
@Override
public Future<RecordMetadata> send(final ProducerRecord<K, V> record, final Callback callback) {
    return Retries.tryMe(new IgniteClosure<RetryCallableAsyncOnCallback, Future<RecordMetadata>>() {
        @Override
        public Future<RecordMetadata> apply(final RetryCallableAsyncOnCallback retryCallableAsyncOnCallback) {
            return inner.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    callback.onCompletion(metadata, exception);
                    if (exception != null) {
                        retryCallableAsyncOnCallback.retry(exception);
                    }
                }
            });
        }
    });
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaStatusBackingStoreTest.java   
@Test
public void putSafeWithNoPreviousValueIsPropagated() {
    final Converter converter = mock(Converter.class);
    final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
    final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);

    final byte[] value = new byte[0];

    final Capture<Struct> statusValueStruct = newCapture();
    converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), capture(statusValueStruct));
    EasyMock.expectLastCall().andReturn(value);

    kafkaBasedLog.send(eq("status-connector-" + CONNECTOR), eq(value), anyObject(Callback.class));
    expectLastCall();

    replayAll();

    final ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.FAILED, WORKER_ID, 0);
    store.putSafe(status);

    verifyAll();

    assertEquals(status.state().toString(), statusValueStruct.getValue().get(KafkaStatusBackingStore.STATE_KEY_NAME));
    assertEquals(status.workerId(), statusValueStruct.getValue().get(KafkaStatusBackingStore.WORKER_ID_KEY_NAME));
    assertEquals(status.generation(), statusValueStruct.getValue().get(KafkaStatusBackingStore.GENERATION_KEY_NAME));
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
    final AtomicInteger attempt = new AtomicInteger(0);
    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    if (attempt.getAndIncrement() == 0) {
                        throw new TimeoutException();
                    }
                    return super.send(record, callback);
                }
            },
            "test");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
    assertEquals(Long.valueOf(0L), offset);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordAccumulator.java   
/**
 *  Try to append to a ProducerBatch.
 *
 *  If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
 *  resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
 *  and memory records built) in one of the following cases (whichever comes first): right before send,
 *  if it is expired, or when the producer is closed.
 */
// 查找batches集合对应队列的最后一个ProducerBatch
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
    //拿到消息队列中的最后一个
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        //调用ProducerBatch的tryAppend方法返回 FutureRecordMetadata future,MemoryRecordsBuilder中是还有空间
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);

    }
    return null;
}
项目:jkes    文件:JkesKafkaProducer.java   
public Future<RecordMetadata> send(Object value, Callback callback) {
    DocumentMetadata documentMetadata =
            Metadata.getMetadata().getMetadataMap().get(value.getClass());

    String topic = documentMetadata.getTopic();

    Method method = documentMetadata.getIdMetadata().getMethod();
    try {
        String key = String.valueOf(method.invoke(value));
        return producer.send(new ProducerRecord<>(topic, key, value), callback);
    } catch (IllegalAccessException | InvocationTargetException e) {
        throw new JkesException(
                String.format("Can't invoke method[%s] on object[%s] of class[%s]", method, value, value.getClass()),
                e);
    }
}
项目:likafka-clients    文件:LiKafkaProducerImpl.java   
public ErrorLoggingCallback(UUID messageId,
                            K key,
                            V value,
                            String topic,
                            Long timestamp,
                            Integer serializedSize,
                            Auditor<K, V> auditor,
                            Callback userCallback) {
  _messageId = messageId;
  _value = value;
  _key = key;
  _topic = topic;
  _timestamp = timestamp;
  _serializedSize = serializedSize;
  _auditor = auditor;
  _userCallback = userCallback;
}
项目:likafka-clients    文件:LiKafkaConsumerIntegrationTest.java   
@Override
public void run() {
  final Set<String> ackedMessages = new HashSet<>();
  for (int i = 0; i < MESSAGE_COUNT; i++) {
    // The message size is set to 100 - 1124, So we should have most of the messages to be large messages
    // while still have some ordinary size messages.
    int messageSize = 100 + _random.nextInt() % 1024;
    final String messageId = UUID.randomUUID().toString().replace("-", "");
    final String message = messageId + TestUtils.getRandomString(messageSize);

    _producer.send(new ProducerRecord<String, String>(_topic, null, (long) i, null, message),
                   new Callback() {
                     @Override
                     public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                       // The callback should have been invoked only once.
                       assertFalse(ackedMessages.contains(messageId));
                       if (e == null) {
                         ackedMessages.add(messageId);
                       }
                       _messages.put(recordMetadata.topic() + "-" + recordMetadata.partition() + "-" + recordMetadata.offset(), message);
                     }
                   });
  }
}
项目:likafka-clients    文件:LageMessageCallbackTest.java   
@Test
public void testLargeMessageCallbackWithoutException() {
  final AtomicInteger callbackFired = new AtomicInteger(0);
  LargeMessageCallback callback = new LargeMessageCallback(numSegments, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      callbackFired.incrementAndGet();
      assertEquals("No exception should be there.", e, null);
    }
  });

  for (int i = 0; i < numSegments - 1; i++) {
    callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), null);
    assertTrue("The user callback should not be fired.", callbackFired.get() == 0);
  }
  callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), null);
  assertTrue("The user callback should not be fired.", callbackFired.get() == 1);
}
项目:kafka    文件:RecordBatch.java   
/**
 * Append the record to the current record set and return the relative offset within that record set
 * 
 * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
 */
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
    if (!this.records.hasRoomFor(key, value)) {
        return null;
    } else {
        long checksum = this.records.append(offsetCounter++, timestamp, key, value);
        this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                               timestamp, checksum,
                                                               key == null ? -1 : key.length,
                                                               value == null ? -1 : value.length);
        if (callback != null)
            thunks.add(new Thunk(callback, future));
        this.recordCount++;
        return future;
    }
}
项目:kafka    文件:RecordAccumulatorTest.java   
@Test
public void testAbortIncompleteBatches() throws Exception {
    long lingerMs = Long.MAX_VALUE;
    final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
    final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
    class TestCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            assertTrue(exception.getMessage().equals("Producer is closed forcefully."));
            numExceptionReceivedInCallback.incrementAndGet();
        }
    }
    for (int i = 0; i < 100; i++)
        accum.append(new TopicPartition(topic, i % 3), 0L, key, value, new TestCallback(), maxBlockTimeMs);
    RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
    assertEquals("No nodes should be ready.", 0, result.readyNodes.size());

    accum.abortIncompleteBatches();
    assertEquals(numExceptionReceivedInCallback.get(), 100);
    assertFalse(accum.hasUnsent());

}
项目:li-apache-kafka-clients    文件:LageMessageCallbackTest.java   
@Test
public void testLargeMessageCallbackWithoutException() {
  final AtomicInteger callbackFired = new AtomicInteger(0);
  LargeMessageCallback callback = new LargeMessageCallback(numSegments, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      callbackFired.incrementAndGet();
      assertEquals("No exception should be there.", e, null);
    }
  });

  for (int i = 0; i < numSegments - 1; i++) {
    callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), null);
    assertTrue("The user callback should not be fired.", callbackFired.get() == 0);
  }
  callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), null);
  assertTrue("The user callback should not be fired.", callbackFired.get() == 1);
}
项目:gameon-map    文件:Kafka.java   
public void publishMessage(String topic, String key, String message){
    final Callback callback = (RecordMetadata m, Exception e) -> {
        if ( e == null ) {
            Log.log(Level.FINER, this, "Published Event");
        } else {
            Log.log(Level.FINER, this, "Error publishing event", e);
        }
    };

    if(producer!=null){
        Log.log(Level.FINER, this, "Publishing Event {0} {1} {2}",topic,key,message);
        ProducerRecord<String,String> pr = new ProducerRecord<>(topic, key, message);
        producer.send(pr, callback);
    }else{
        Log.log(Level.FINER, this, "Kafka Unavailable, ignoring event {0} {1} {2}",topic,key,message);
    }
}
项目:change-data-capture    文件:Producer.java   
public void send(final String topic, final byte[] key, final byte[] msg)
        throws InterruptedException, ExecutionException {
    ProducerRecord<byte[], byte[]> rec = new ProducerRecord<byte[], byte[]>(
            topic, key, msg);
    Future<RecordMetadata> res = producer.send(rec, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                logger.error(" failed to send record to {}: {}", topic, e);
                logger.debug("Failed record: topic {},  key {}, value {}",
                        topic, key, msg);
            } else {
                logger.trace(
                        "Wrote record successfully: topic {} partition {} offset {}",
                        recordMetadata.topic(), recordMetadata.partition(),
                        recordMetadata.offset());
            }
        }
    });
    if (sync) {
        res.get();
    }

}
项目:registry    文件:KafkaAvroSerDesWithKafkaServerTest.java   
private String produceMessage(String topicName, Object msg) {
    String bootstrapServers = CLUSTER.bootstrapServers();
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.exportClientConf(true));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

    final Producer<String, Object> producer = new KafkaProducer<>(config);
    final Callback callback = new ProducerCallback();
    LOG.info("Sending message: [{}] to topic: [{}]", msg, topicName);
    ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topicName, getKey(msg), msg);
    producer.send(producerRecord, callback);
    producer.flush();
    LOG.info("Message successfully sent to topic: [{}]", topicName);
    producer.close(5, TimeUnit.SECONDS);

    return bootstrapServers;
}
项目:HeliosStreams    文件:KafkaProducerService.java   
/**
 * {@inheritDoc}
 * @see org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback)
 */
@Override
public Future<RecordMetadata> send(final ProducerRecord<K, V> record, final Callback callback) {
    final long startTime = System.nanoTime();       
    try { 
        final Future<RecordMetadata> f = producer.send(record, new Callback(){
            @Override
            public void onCompletion(final RecordMetadata metadata, final Exception exception) {
                sendMessageResponse.update(System.nanoTime()-startTime, TimeUnit.NANOSECONDS);
                if(callback!=null) {
                    callback.onCompletion(metadata, exception);
                }
            }
        });
        sendCounter.inc();
        sendMessage.update(System.nanoTime()-startTime, TimeUnit.NANOSECONDS);
        getTopicCounter(record.topic()).increment();
        return f;
    } catch (Exception ex) {
        droppedMessages.inc();
        log.error("Failed to send producer record", ex);
        return failedFuture(ex);
    }
}
项目:flink    文件:FlinkKafkaProducerBaseTest.java   
/**
 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
 * the snapshot method does indeed finishes without waiting for pending records;
 * we set a timeout because the test will not finish if the logic is broken.
 */
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
        FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
    producer.setFlushOnCheckpoint(false);

    final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();

    final OneInputStreamOperatorTestHarness<String, Object> testHarness =
        new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));

    testHarness.open();

    testHarness.processElement(new StreamRecord<>("msg"));

    // make sure that all callbacks have not been completed
    verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));

    // should return even if there are pending records
    testHarness.snapshot(123L, 123L);

    testHarness.close();
}
项目:flink    文件:FlinkKafkaProducerBaseTest.java   
@SuppressWarnings("unchecked")
DummyFlinkKafkaProducer(Properties producerConfig, KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) {

    super(DUMMY_TOPIC, schema, producerConfig, partitioner);

    this.mockProducer = mock(KafkaProducer.class);
    when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() {
        @Override
        public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
            pendingCallbacks.add(invocationOnMock.getArgumentAt(1, Callback.class));
            return null;
        }
    });

    this.pendingCallbacks = new ArrayList<>();
    this.flushLatch = new MultiShotLatch();
}
项目:distributedlog    文件:DLFutureRecordMetadata.java   
DLFutureRecordMetadata(final String topic,
                       com.twitter.util.Future<DLSN> dlsnFuture,
                       final Callback callback) {
    this.topic = topic;
    this.dlsnFuture = dlsnFuture;
    this.callback = callback;

    this.dlsnFuture.addEventListener(new FutureEventListener<DLSN>() {
        @Override
        public void onFailure(Throwable cause) {
            callback.onCompletion(null, new IOException(cause));
        }

        @Override
        public void onSuccess(DLSN value) {
            callback.onCompletion(new RecordMetadata(new TopicPartition(topic, 0), -1L, -1L), null);
        }
    });
}
项目:Camel    文件:KafkaProducerTest.java   
@Test
public void processAsyncSendsMessageWithException() throws Exception {

    endpoint.setTopic("sometopic");
    Mockito.when(exchange.getIn()).thenReturn(in);

    // setup the exception here
    org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
    Mockito.when(kp.send(Mockito.any(ProducerRecord.class), Mockito.any(Callback.class))).thenThrow(new ApiException());

    in.setHeader(KafkaConstants.PARTITION_KEY, "4");

    producer.process(exchange, callback);

    Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
    Mockito.verify(exchange).setException(Matchers.isA(ApiException.class));
    Mockito.verify(callback).done(Matchers.eq(true));
}
项目:qbit-extensions    文件:KafkaServiceBuilder.java   
public Supplier<Callback> getCallbackSupplier() {

        if (callbackSupplier == null) {

            final Logger logger = getLogger();
            final boolean debug = logger.isDebugEnabled();
            callbackSupplier = () -> (metadata, exception) -> {
                if (debug) {
                    if (metadata != null) {
                        logger.debug(metadata.toString());
                    }
                }

                if (exception != null) {
                    logger.error("Unable to send message to kafka", exception);
                }
            };
        }
        return callbackSupplier;
    }
项目:apex-malhar    文件:KafkaSinglePortExactlyOnceOutputOperator.java   
protected void sendTuple(T tuple)
{
  if (alreadyInKafka(tuple)) {
    return;
  }

  getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback()
  {
    public void onCompletion(RecordMetadata metadata, Exception e)
    {
      if (e != null) {
        logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage());
        throw new RuntimeException(e);
      }
    }
  });
}
项目:Precipice    文件:KafkaService.java   
public PrecipiceFuture<ProduceStatus, RecordMetadata> sendRecordAction(ProducerRecord<K, V> record) {
    final PrecipicePromise<ProduceStatus, RecordMetadata> promise = Asynchronous.acquirePermitsAndPromise(guardRail, 1L);

    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                promise.complete(ProduceStatus.SUCCESS, metadata);
            } else {
                if (exception instanceof TimeoutException) {
                    promise.completeExceptionally(ProduceStatus.TIMEOUT, exception);
                } else if (exception instanceof NetworkException) {
                    promise.completeExceptionally(ProduceStatus.NETWORK_EXCEPTION, exception);
                } else {
                    promise.completeExceptionally(ProduceStatus.OTHER_ERROR, exception);
                }
            }
        }
    });

    return promise.future();
}
项目:gameon-map    文件:Kafka.java   
public void publishMessage(String topic, String key, String message){
    final Callback callback = (RecordMetadata m, Exception e) -> {
        if ( e == null ) {
            Log.log(Level.FINER, this, "Published Event");
        } else {
            Log.log(Level.FINER, this, "Error publishing event", e);
        }
    };

    if(producer!=null){
        Log.log(Level.FINER, this, "Publishing Event {0} {1} {2}",topic,key,message);
        ProducerRecord<String,String> pr = new ProducerRecord<>(topic, key, message);
        producer.send(pr, callback);
    }else{
        Log.log(Level.FINER, this, "Kafka Unavailable, ignoring event {0} {1} {2}",topic,key,message);
    }
}
项目:gameon-player    文件:Kafka.java   
public void publishMessage(String topic, String key, String message){
    final Callback callback = (RecordMetadata m, Exception e) -> {
        if ( e == null ) {
            Log.log(Level.FINER, this, "Published Event");
        } else {
            Log.log(Level.FINER, this, "Error publishing event", e);
        }
    };

    if(producer!=null){
        Log.log(Level.FINER, this, "Publishing Event {0} {1} {2}",topic,key,message);
        ProducerRecord<String,String> pr = new ProducerRecord<String,String>(topic, key, message);
        producer.send(pr, callback);
    } else{
        Log.log(Level.FINER, this, "Kafka Unavailable, ignoring event {0} {1} {2}",topic,key,message);
    }
}
项目:Aletheia    文件:KafkaCallbackTransformer.java   
/**
 * Converts {@link com.outbrain.aletheia.datum.production.DeliveryCallback} to {@link  org.apache.kafka.clients.producer.Callback}
 *
 * @param deliveryCallback The callback provided by the user.
 * @param endpoint The endpoint for which the delivery callback will be invoked.
 *
 */
public Callback transform(final DeliveryCallback deliveryCallback,
                          final EndPoint endpoint){
  return new Callback() {
    @Override
    public void onCompletion(final RecordMetadata metadata, final Exception exception) {
      // It's guaranteed in Kafka API that exactly one of the arguments will be null
      if (metadata != null){
        deliveryCallback.onSuccess(
                new EndpointDeliveryMetadata(endpoint));
      } else {
        kafkaDeliveryFailure.inc();
        deliveryCallback.onError(
                new EndpointDeliveryMetadata(endpoint),
                exception);
      }
    }
  };
}
项目:kaa    文件:KafkaLogEventDao.java   
@Override
public List<Future<RecordMetadata>> save(List<KafkaLogEventDto> logEventDtoList,
                                         GenericAvroConverter<GenericRecord> eventConverter, GenericAvroConverter<GenericRecord> headerConverter,
                                         Callback callback) throws IOException {
  List<Future<RecordMetadata>> results = new ArrayList<Future<RecordMetadata>>();
  LOG.info("[{}] Sending events to Kafka using {} key defining strategy", topicName, configuration
      .getKafkaKeyType().toString());
  for (KafkaLogEventDto dto : logEventDtoList) {
    ProducerRecord<String, String> recordToWrite;
    if (configuration.getUseDefaultPartitioner()) {
      recordToWrite = new ProducerRecord<String, String>(topicName, getKey(dto), formKafkaJson(dto,
          eventConverter, headerConverter));
    } else {
      recordToWrite = new ProducerRecord<String, String>(topicName, calculatePartitionId(dto), getKey(dto),
          formKafkaJson(dto, eventConverter, headerConverter));
    }
    results.add(producer.send(recordToWrite, callback));
  }
  return results;
}
项目:ddth-kafka    文件:QndMultithreadAsyncProducerConsumerListener.java   
private static Thread[] createProducerThreads(KafkaClient kafkaClient, String topic,
        AtomicLong counterSent, int numThreads, final int numMsgs) {
    Thread[] result = new Thread[numThreads];
    for (int i = 0; i < numThreads; i++) {
        result[i] = new Thread("Producer - " + i) {
            public void run() {
                Callback callback = new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            counterSent.incrementAndGet();
                        }
                    }
                };
                for (int i = 0; i < numMsgs; i++) {
                    String content = i + ":" + idGen.generateId128Hex();
                    KafkaMessage msg = new KafkaMessage(topic, content);
                    kafkaClient.sendMessageRaw(msg, callback);
                }
            }
        };
    }
    return result;
}
项目:ddth-kafka    文件:QndMultithreadAsyncProducerConsumer.java   
private static Thread[] createProducerThreads(KafkaClient kafkaClient, String topic,
        AtomicLong counterSent, int numThreads, final int numMsgs) {
    Thread[] result = new Thread[numThreads];
    for (int i = 0; i < numThreads; i++) {
        result[i] = new Thread("Producer - " + i) {
            public void run() {
                Callback callback = new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            counterSent.incrementAndGet();
                        }
                    }
                };
                for (int i = 0; i < numMsgs; i++) {
                    String content = i + ":" + idGen.generateId128Hex();
                    KafkaMessage msg = new KafkaMessage(topic, content);
                    kafkaClient.sendMessageRaw(msg, callback);
                }
            }
        };
    }
    return result;
}
项目:logging-log4j2    文件:KafkaManager.java   
public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
    if (producer != null) {
        byte[] newKey = null;

        if(key != null && key.contains("${")) {
            newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
        } else if (key != null) {
            newKey = key.getBytes(StandardCharsets.UTF_8);
        }

        final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
        if (syncSend) {
            final Future<RecordMetadata> response = producer.send(newRecord);
            response.get(timeoutMillis, TimeUnit.MILLISECONDS);
        } else {
            producer.send(newRecord, new Callback() {
                @Override
                public void onCompletion(final RecordMetadata metadata, final Exception e) {
                    if (e != null) {
                        LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
                    }
                }
            });
        }
    }
}
项目:fiery    文件:ProviderThread.java   
public ProviderThread(String kafkaTopic, String serverList, ConcurrentLinkedQueue<String> metaLogQueue, ConcurrentLinkedQueue<String> commonLogQueue) {

        if (metaLogQueue == null) {
            log.error("meta queue obj is null...");
            System.exit(7);
        }

        if (commonLogQueue == null) {
            log.error("common queue obj is null...");
            System.exit(8);
        }

        if (serverList == null || serverList.length() == 0) {
            log.error("kafka server list is empty...");
            System.exit(9);
        }

        if (kafkaTopic == null || kafkaTopic.length() == 0) {
            log.error("kafka topic is empty...");
            System.exit(4);
        }

        callback = new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                    log.error("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                }
            }
        };

        this.metaLogQueue = metaLogQueue;
        this.commonLogQueue = commonLogQueue;
        this.serverList = serverList;
        this.kafkaTopic = kafkaTopic;

    }
项目:ja-micro    文件:KafkaPublisherTest.java   
@Test
public void sendFailsReturnsFalse() {
    KafkaProducer producer = mock(KafkaProducer.class);
    publisher.realProducer = producer;
    RecordMetadata metadata = new RecordMetadata(null, 0, 0,
            0, Long.valueOf(0), 0, 0);
    ArgumentCaptor<Callback> captor = ArgumentCaptor.forClass(Callback.class);
    when(producer.send(any(), captor.capture())).then(
        invocation -> {
            captor.getValue().onCompletion(metadata, new TimeoutException("error"));
            return new CompletableFuture();
        });
    String[] events = { "test" };
    assertThat(publisher.publishEvents(false, null, events)).isFalse();
}