Java 类kafka.consumer.ConsumerIterator 实例源码
项目:Kafka-Insight
文件:KafkaOffsetGetter.java
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
ConsumerConnector consumerConnector = KafkaUtils.createConsumerConnector(zkAddr, group);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(CONSUMER_OFFSET_TOPIC, new Integer(1));
KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(CONSUMER_OFFSET_TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator();
while (true) {
MessageAndMetadata<byte[], byte[]> offsetMsg = it.next();
if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) {
try {
GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));
if (offsetMsg.message() == null) {
continue;
}
kafka.common.OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));
kafkaConsumerOffsets.put(commitKey, commitValue);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
项目:flume-release-1.7.0
文件:KafkaConsumer.java
public MessageAndMetadata getNextMessage(String topic) {
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// it has only a single stream, because there is only one consumer
KafkaStream stream = streams.get(0);
final ConsumerIterator<byte[], byte[]> it = stream.iterator();
int counter = 0;
try {
if (it.hasNext()) {
return it.next();
} else {
return null;
}
} catch (ConsumerTimeoutException e) {
logger.error("0 messages available to fetch for the topic " + topic);
return null;
}
}
项目:open-kilda
文件:Original.java
@Test
public void shouldWriteThenRead() throws Exception {
//Create a consumer
ConsumerIterator<String, String> it = buildConsumer(Original.topic);
//Create a producer
producer = new KafkaProducer<>(producerProps());
//send a message
producer.send(new ProducerRecord<>(Original.topic, "message")).get();
//read it back
MessageAndMetadata<String, String> messageAndMetadata = it.next();
String value = messageAndMetadata.message();
assertThat(value, is("message"));
}
项目:open-kilda
文件:SimpleKafkaTest.java
@Test
public void shouldWriteThenRead() throws Exception {
//Create a consumer
ConsumerIterator<String, String> it = buildConsumer(SimpleKafkaTest.topic);
//Create a producer
producer = new KafkaProducer<>(producerProps());
//send a message
producer.send(new ProducerRecord<>(SimpleKafkaTest.topic, "message")).get();
//read it back
MessageAndMetadata<String, String> messageAndMetadata = it.next();
String value = messageAndMetadata.message();
assertThat(value, is("message"));
}
项目:tasfe-framework
文件:KafkaConsumerThread.java
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> mam = it.next();
String jsonStr = "";
try {
jsonStr = new String(mam.message());
JSONObject jsonObject = JSONObject.parseObject(jsonStr);
LogcenterConfig config = LogConfigCache.getLogConfigCache(jsonObject);
IStorageApi iStorageApi = ServiceRegister.getInstance().getProvider(config.getStorageType());
iStorageApi.save(jsonObject);
} catch (Exception e) {
e.printStackTrace();
logger.error("partition[" + mam.partition() + "]," + "offset[" + mam.offset() + "], " + jsonStr, e);
continue;
}
}
}
项目:wngn-jms-kafka
文件:LogbackIntegrationIT.java
@Test
public void testLogging() throws InterruptedException {
for (int i = 0; i<1000; ++i) {
logger.info("message"+i);
}
final KafkaStream<byte[], byte[]> log = kafka.createClient().createMessageStreamsByFilter(new Whitelist("logs"),1).get(0);
final ConsumerIterator<byte[], byte[]> iterator = log.iterator();
for (int i=0; i<1000; ++i) {
final String messageFromKafka = new String(iterator.next().message(), UTF8);
assertThat(messageFromKafka, Matchers.equalTo("message"+i));
}
}
项目:DataProcessPlatformKafkaJavaSDK
文件:KafkaConsumerTransducer.java
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(transducer_topic, new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(transducer_topic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext() && bStartConsume){
transducerDataProcessor.newData(it.next().message());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
项目:iotdb-jdbc
文件:KafkaConsumer.java
public void run() {
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<String, String> consumerIterator = it.next();
String uploadMessage = consumerIterator.message();
System.out.println(Thread.currentThread().getName()
+ " from partiton[" + consumerIterator.partition() + "]: "
+ uploadMessage);
try {
sendDataToIotdb.writeData(uploadMessage); // upload data to the IoTDB database
} catch (Exception ex) {
System.out.println("SQLException: " + ex.getMessage());
}
}
}
项目:storm-demos
文件:KafkaDataSpout.java
public void nextTuple() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TopologyConfig.kafkaTopic, 1);//one excutor - one thread
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = conn.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic);
ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
while(true){
while(iter.hasNext()){
String s = new String(iter.next().message());
collector.emit(new Values(s));
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, new Values(s));
}
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
logger.error("Spout : sleep wrong \n", e);
}
}
}
项目:koper
文件:KafkaReceiver.java
private void processStreamsByTopic(String topicKeys, List<KafkaStream<byte[], byte[]>> streamList) {
// init stream thread pool
ExecutorService streamPool = Executors.newFixedThreadPool(partitions);
String[] topics = StringUtils.split(topicKeys, ",");
if (log.isDebugEnabled())
log.debug("准备处理消息流集合 KafkaStreamList,topic count={},topics={}, partitions/topic={}", topics.length, topicKeys, partitions);
//遍历stream
AtomicInteger index = new AtomicInteger(0);
for (KafkaStream<byte[], byte[]> stream : streamList) {
Thread streamThread = new Thread() {
@Override
public void run() {
int i = index.getAndAdd(1);
if (log.isDebugEnabled())
log.debug("处理消息流KafkaStream -- No.={}, partitions={}", i, partitions + ":" + i);
ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
processStreamByConsumer(topicKeys, consumerIterator);
}
};
streamPool.execute(streamThread);
}
}
项目:light_drtc
文件:KafkaMqCollect.java
public void collectMq(){
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(Constants.kfTopic, new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(Constants.kfTopic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
MessageAndMetadata<String, String> msgMeta;
while (it.hasNext()){
msgMeta = it.next();
super.mqTimer.parseMqText(msgMeta.key(), msgMeta.message());
//System.out.println(msgMeta.key()+"\t"+msgMeta.message());
}
}
项目:vertx-kafka-service
文件:KafkaProducerServiceIntegrationTest.java
private void consumeMessages() {
final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, 1);
final StringDecoder decoder =
new StringDecoder(new VerifiableProperties());
final Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, decoder, decoder);
final KafkaStream<String, String> stream =
consumerMap.get(TOPIC).get(0);
final ConsumerIterator<String, String> iterator = stream.iterator();
Thread kafkaMessageReceiverThread = new Thread(
() -> {
while (iterator.hasNext()) {
String msg = iterator.next().message();
msg = msg == null ? "<null>" : msg;
System.out.println("got message: " + msg);
messagesReceived.add(msg);
}
},
"kafkaMessageReceiverThread"
);
kafkaMessageReceiverThread.start();
}
项目:jlogstash-input-plugin
文件:KafkaDistributed.java
public void run() {
try {
while(true){
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
String m = null;
try {
m = new String(it.next().message(),
this.kafkaInput.encoding);
Map<String, Object> event = this.decoder
.decode(m);
if(zkDistributed==null){
this.kafkaInput.process(event);
}else{
zkDistributed.route(event);
}
} catch (Exception e) {
logger.error("process event:{} failed:{}",m,ExceptionUtil.getErrorMessage(e));
}
}
}
} catch (Exception t) {
logger.error("kakfa Consumer fetch is error:{}",ExceptionUtil.getErrorMessage(t));
}
}
项目:jlogstash-input-plugin
文件:Kafka.java
public void run() {
try {
while(true){
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
String m = null;
try {
m = new String(it.next().message(),
this.kafkaInput.encoding);
Map<String, Object> event = this.decoder
.decode(m);
if (event!=null&&event.size()>0){
this.kafkaInput.process(event);
}
} catch (Exception e) {
logger.error("process event:{} failed:{}",m,e.getCause());
}
}
}
} catch (Exception t) {
logger.error("kakfa Consumer fetch is error:{}",t.getCause());
}
}
项目:iote2e
文件:KafkaAvroDemo.java
public void run() {
Iote2eRequestReuseItem iote2eRequestReuseItem = new Iote2eRequestReuseItem();
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
String key = new String(messageAndMetadata.key());
try {
String summary =
"Thread " + threadNumber +
", topic=" + messageAndMetadata.topic() +
", partition=" + messageAndMetadata.partition() +
", key=" + key +
", offset=" + messageAndMetadata.offset() +
", timestamp=" + messageAndMetadata.timestamp() +
", timestampType=" + messageAndMetadata.timestampType() +
", iote2eRequest=" + iote2eRequestReuseItem.fromByteArray(messageAndMetadata.message()).toString();
logger.info(">>> Consumed: " + summary);
} catch( Exception e ) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>> Shutting down Thread: " + threadNumber);
}
项目:iote2e
文件:KafkaStringDemo.java
public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
String key = new String( messageAndMetadata.key() );
String message = new String( messageAndMetadata.message() );
String summary =
"Thread " + threadNumber +
", topic=" + messageAndMetadata.topic() +
", partition=" + messageAndMetadata.partition() +
", key=" + key +
", message=" + message +
", offset=" + messageAndMetadata.offset() +
", timestamp=" + messageAndMetadata.timestamp() +
", timestampType=" + messageAndMetadata.timestampType();
logger.info(">>> Consumed: " + summary);
}
logger.info(">>> Shutting down Thread: " + threadNumber);
}
项目:iote2e
文件:AvroConsumerThread.java
public void run() {
try {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(User.getClassSchema());
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
String key = new String(messageAndMetadata.key());
User user = genericRecordToUser(recordInjection.invert(messageAndMetadata.message()).get());
// User user = (User)
// recordInjection.invert(messageAndMetadata.message()).get();
String summary = "Thread " + m_threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition="
+ messageAndMetadata.partition() + ", key=" + key + ", user=" + user.toString() + ", offset="
+ messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp()
+ ", timestampType=" + messageAndMetadata.timestampType();
System.out.println(summary);
}
System.out.println("Shutting down Thread: " + m_threadNumber);
} catch (Exception e) {
System.out.println("Exception in thread "+m_threadNumber);
System.out.println(e);
e.printStackTrace();
}
}
项目:iote2e
文件:ConsumerDemoThread.java
public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
String key = new String( messageAndMetadata.key() );
String message = new String( messageAndMetadata.message() );
String summary =
"Thread " + threadNumber +
", topic=" + messageAndMetadata.topic() +
", partition=" + messageAndMetadata.partition() +
", key=" + key +
", message=" + message +
", offset=" + messageAndMetadata.offset() +
", timestamp=" + messageAndMetadata.timestamp() +
", timestampType=" + messageAndMetadata.timestampType();
System.out.println(summary);
}
System.out.println("Shutting down Thread: " + threadNumber);
}
项目:cloudinsight-platform-docker
文件:CollectorTest.java
public void recv() {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null));
KafkaStream<String, String> stream = streamMap.get(topic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<String, String> mm = it.next();
System.out.println("<<< Got new message");
System.out.println("<<< key:" + mm.key());
System.out.println("<<< m: " + mm.message());
}
}
项目:cloudinsight-platform-docker
文件:CollectorTest.java
public void recv() {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null));
KafkaStream<String, String> stream = streamMap.get(topic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<String, String> mm = it.next();
System.out.println("<<< Got new message");
System.out.println("<<< key:" + mm.key());
System.out.println("<<< m: " + mm.message());
}
}
项目:watchtower-automation
文件:KafkaConsumerRunnable.java
public void run() {
logger.debug("KafkaChannel {} has stream", this.threadNumber);
final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();
running = true;
while (running) {
try {
if (streamIterator.hasNext()) {
final byte[] message = streamIterator.next().message();
logger.debug("Thread {}: {}", threadNumber, message.toString());
consumeMessage(message);
}
} catch (ConsumerTimeoutException cte) {
logger.debug("Timed out when consuming from Kafka", cte);
KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
}
}
}
项目:sqoop-on-spark
文件:KafkaConsumer.java
public MessageAndMetadata getNextMessage(String topic) {
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// it has only a single stream, because there is only one consumer
KafkaStream stream = streams.get(0);
final ConsumerIterator<byte[], byte[]> it = stream.iterator();
int counter = 0;
try {
if (it.hasNext()) {
return it.next();
} else {
return null;
}
} catch (ConsumerTimeoutException e) {
logger.error("0 messages available to fetch for the topic " + topic);
return null;
}
}
项目:watchtower-workflow
文件:KafkaConsumerRunnable.java
public void run() {
logger.info("KafkaChannel {} has stream", this.threadNumber);
final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();
running = true;
while (running) {
try {
if (streamIterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = streamIterator.next();
byte[] key = messageAndMetadata.key();
byte[] message = messageAndMetadata.message();
consumeMessage(key, message);
}
} catch (ConsumerTimeoutException cte) {
logger.debug("Timed out when consuming from Kafka", cte);
KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
}
}
}
项目:data-acquisition
文件:KafkaRequestIdQueue.java
/**
* Modified example from kafka site with some defensive checks added.
*/
private ConsumerIterator<String, String> getStreamIterator() {
Map<String, Integer> topicCountMap = ImmutableMap.of(topic, TOPIC_COUNT);
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, msgDecoder);
List<KafkaStream<String, String>> streams = consumerMap.get(topic);
Preconditions.checkNotNull(streams, "There is no topic named : " + topic);
//copy in case of live list returned. Needed for index check below.
ImmutableList<KafkaStream<String, String>> streamsCopy = ImmutableList.copyOf(streams);
Preconditions.checkElementIndex(FIRST_ELEMENT_INDEX, streamsCopy.size(),
"Failed to find any KafkaStreams related to topic : " + topic);
KafkaStream<String, String> stream = streamsCopy.get(FIRST_ELEMENT_INDEX);
Preconditions.checkNotNull(stream, "Returned kafka stream is null");
ConsumerIterator<String, String> iterator = stream.iterator();
Preconditions.checkNotNull(iterator, "Returned kafka iterator is null");
return iterator;
}
项目:VoltDB
文件:KafkaLoader.java
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> md = it.next();
byte msg[] = md.message();
long offset = md.offset();
String smsg = new String(msg);
try {
m_loader.insertRow(new RowWithMetaData(smsg, offset), m_csvParser.parseLine(smsg));
} catch (Exception ex) {
m_log.error("Consumer stopped", ex);
System.exit(1);
}
}
}
项目:kafka-consumer
文件:InFluxAvroConsumer.java
/**
* To avoid too many try-catches this is separate..
*/
public void runrun() {
log.info("Waiting to consume data");
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//loop through all messages in the stream
while (it.hasNext()) {
byte[] msg = it.next().message();
if (msg.length < 2) {
//valid messages are longer than 2 bytes as the first one is schema id
//once upon time some libraries (pypro) would start with a short message to try if the kafka topic was alive. this is what topic polling refers to.
log.info("ignoring short msg, assuming topic polling");
continue;
}
// log.trace("Thread " + id + ":: " + Arrays.toString(msg));
process(msg);
}
log.info("Shutting down consumer Thread: " + id);
}
项目:cep
文件:Consumer.java
/**
* Starts the consumer thread.
*/
@Override
public void run() {
log.debug("Starting consumer for topic {}", topic);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
// For each message present on the partition...
while (it.hasNext()) {
Map<String, Object> event = null;
// Parse it with the parser associated with the topic
try {
event = parser.parse(new String(it.next().message(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
// Send it to the source
if (event != null) {
source.send(topic.getName(), event);
}
}
log.debug("Finished consumer for topic {}", topic);
}
项目:easyframe-msg
文件:KafkaHelper.java
/**消费消息 [指定Topic]
*
* @param topicName 队列名称
* @param groupId Group Name
* @return
*/
static MsgIterator consume(String topicName, String groupId) {
ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //(topic, #stream) pair
topicCountMap.put(topicName, new Integer(1));
//TODO: 可消费多个topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName); //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair
KafkaStream<byte[], byte[]> stream = streamList.get(0);
//KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
ConsumerIterator<byte[], byte[]> it = stream.iterator();
MsgIterator iter = new MsgIterator(it);
return iter;
}
项目:easyframe-msg
文件:KafkaHelper.java
/**消费消息 [指定Topic] 指定线程
*
* @param topicName 队列名称
* @param numStreams Number of streams to return
* @return A list of MsgIterator each of which provides an iterator over message over allowed topics
*/
static List<MsgIterator> consume(String topicName, int numStreams, String groupId) {
ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //(topic, #stream) pair
topicCountMap.put(topicName, numStreams);
//TODO: 可消费多个topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName); //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair
List<MsgIterator> iterList = new ArrayList<MsgIterator>();
for (KafkaStream<byte[], byte[]> stream : streamList) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
MsgIterator iter = new MsgIterator(it);
iterList.add(iter);
}
//KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
return iterList;
}
项目:datacollector
文件:KafkaDestinationSinglePartitionPipelineRunIT.java
@Override
protected int getRecordsInTarget() {
int expectedRecordsInTarget = 0;
for(KafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
try {
while (it.hasNext()) {
expectedRecordsInTarget++;
it.next();
}
} catch (kafka.consumer.ConsumerTimeoutException e) {
//no-op
}
}
return expectedRecordsInTarget;
}
项目:datacollector
文件:KafkaDestinationMultiPartitionPipelineRunIT.java
@Override
protected int getRecordsInTarget() {
int expectedRecordsInTarget = 0;
for(KafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
try {
while (it.hasNext()) {
expectedRecordsInTarget++;
it.next();
}
} catch (kafka.consumer.ConsumerTimeoutException e) {
//no-op
}
}
return expectedRecordsInTarget;
}
项目:netty-kafka-producer
文件:DataBrokerTest.java
@Test
public void test_sendMessage() throws Exception {
createTopic(topic);
CountDownLatch latch = new CountDownLatch(1);
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
DataKafkaBroker dataChannel = new DataKafkaBroker("localhost", START_PORT, 0, topic,new NioEventLoopGroup(), properties);
dataChannel.connect().sync();
dataChannel.send(freeLaterBuffer("1".getBytes()), 0, freeLaterBuffer(TEST_MESSAGE.getBytes()));
final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE));
dataChannel.disconnect();
}
项目:netty-kafka-producer
文件:ListenerTest.java
@Test
public void test_no_acks_send_message() throws Exception {
String topic = "test_no_acks_send_message";
createTopic(topic, 1);
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
properties.override(ProducerProperties.DATA_ACK, Acknowledgment.WAIT_FOR_NO_ONE);
KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
producer.connect().sync();
KafkaTopic kafkaTopic = producer.topic();
kafkaTopic.send(null, freeLaterBuffer(TEST_MESSAGE.getBytes()));
final List<KafkaStream<byte[], byte[]>> consume = consume(topic);
final KafkaStream<byte[], byte[]> stream = consume.get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(TEST_MESSAGE, is(new String(messages.next().message())));
producer.disconnect().sync();
}
项目:netty-kafka-producer
文件:KafkaTopicSingleBrokerTest.java
@Test
public void test_producer() throws Exception {
String topic = "test";
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
createTopic(topic);
KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
producer.connect().sync();
KafkaTopic kafkaTopic = producer.topic();
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes()));
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes()));
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes()));
final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01"));
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02"));
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03"));
producer.disconnect().sync();
}
项目:yuzhouwan
文件:ConsumerWorker.java
@Override
public void run() {
ConsumerIterator<byte[], byte[]> iter = kafkaStream.iterator();
MessageAndMetadata<byte[], byte[]> msg;
int total = 0, fail = 0, success = 0;
long start = System.currentTimeMillis();
while (iter.hasNext()) {
try {
msg = iter.next();
_log.info("Thread {}: {}", threadNum, new String(msg.message(), "utf-8"));
_log.info("partition: {}, offset: {}", msg.partition(), msg.offset());
success++;
} catch (Exception e) {
_log.error("{}", e);
fail++;
}
_log.info("Count [fail/success/total]: [{}/{}/{}], Time: {}s", fail, success, ++total,
(System.currentTimeMillis() - start) / 1000);
}
}
项目:java-kafka
文件:KafkaLogAppenderTest.java
@Test
public void testKafkaLogAppender() {
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeper);
consumerProps.put("group.id", "kafka-log-appender-test");
consumerProps.put("auto.offset.reset", "smallest");
consumerProps.put("schema.registry.url", schemaRegistry);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
.createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
.get(topic).get(0).iterator();
String testMessage = "I am a test message";
logger.info(testMessage);
MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
assertEquals(logLine.get("line").toString(), testMessage);
assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
assertNotNull(logLine.get("source"));
assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
项目:LogRTA
文件:KafkaSpoutTest.java
public void activate() {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
System.out.println("*********Results********topic:"+topic);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
while(it.hasNext()){
String value =new String(it.next().message());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS");
Date curDate = new Date(System.currentTimeMillis());
String str = formatter.format(curDate);
System.out.println("storm接收到来自kafka的消息--->" + value);
collector.emit(new Values(value,1,str), value);
}
}
项目:java-kafka-client-libs
文件:ConsumerThread.java
@Override
public void run() {
LOG.info( "Consuming thread started" );
try {
ConsumerIterator<byte[], byte[]> it = _stream.iterator();
while ( it.hasNext() ) {
long start = System.currentTimeMillis();
byte[] message = it.next().message();
LOG.debug( "message received: {}", ( new String( message ) ) );
_handler.onMessage( message );
long time = System.currentTimeMillis() - start;
KruxStdLib.STATSD.time( "message_received." + _topic, time );
}
} catch ( Exception e ) {
if ( e instanceof InterruptedException ) {
LOG.warn( "Consumer group threads interrupted, shutting down" );
} else {
LOG.error( "no longer fetching messages", e );
}
}
}
项目:java.study
文件:SimpleHLConsumer.java
public void testConsumer() throws Exception {
String fileName = "logX.txt";
BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String msg = new String(it.next().message(), Charset.forName("UTF-8"));
System.out.println("Message from Single Topic: " + msg);
out.write(msg, 0, msg.length());
out.write('\n');
out.flush();
}
}
if (consumer != null) {
consumer.shutdown();
}
if (null != out) {
out.close();
}
}
项目:monasca-thresh
文件:KafkaSpout.java
@Override
public void run() {
while (this.shouldContinue) {
final ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
if (it.hasNext()) {
final byte[] message = it.next().message();
synchronized (this) {
this.message = message;
// Wake up getMessage() if it is waiting
if (this.waiting) {
notify();
}
while (this.message != null && this.shouldContinue)
try {
wait();
} catch (InterruptedException e) {
logger.info("Wait interrupted", e);
}
}
}
}
logger.info("readerThread {} exited", this.readerThread.getName());
this.readerThread = null;
}