Python kafka 模块,SimpleConsumer() 实例源码
我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用kafka.SimpleConsumer()。
def test_simple_consumer_leader_change(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
topic='topic', partitions=[0, 1],
auto_commit=False)
# Mock so that only the first request gets a valid response
def not_leader(request):
return FetchResponsePayload(request.topic, request.partition,
NotLeaderForPartitionError.errno, -1, ())
client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)
# This should not raise an exception
consumer.get_messages(20)
# client should have updated metadata
self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
def test_simple_consumer_unknown_topic_partition(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
topic='topic', partitions=[0, 1],
auto_commit=False)
# Mock so that only the first request gets a valid response
def unknown_topic_partition(request):
return FetchResponsePayload(request.topic, request.partition,
UnknownTopicOrPartitionError.errno, -1, ())
client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)
# This should not raise an exception
with self.assertRaises(UnknownTopicOrPartitionError):
consumer.get_messages(20)
def test_simple_consumer_commit_does_not_raise(self):
client = MagicMock()
client.get_partition_ids_for_topic.return_value = [0, 1]
def mock_offset_fetch_request(group, payloads, **kwargs):
return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads]
client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
def mock_offset_commit_request(group, payloads, **kwargs):
raise FailedPayloadsError(payloads[0])
client.send_offset_commit_request.side_effect = mock_offset_commit_request
consumer = SimpleConsumer(client, group='foobar',
topic='topic', partitions=[0, 1],
auto_commit=False)
# Mock internal commit check
consumer.count_since_commit = 10
# This should not raise an exception
self.assertFalse(consumer.commit(partitions=[0, 1]))
def connect(self):
""" Connect to kafka and create a consumer.
It uses config parameters to create a kafka-python
KafkaClient and SimpleConsumer.
"""
# Instantiate a kafka client connected to kafka.
self.client = KafkaClient(
self.config.broker_list,
client_id=self.config.client_id
)
# Create a kafka SimpleConsumer.
self.kafka_consumer = SimpleConsumer(
client=self.client, topic=self.topic, partitions=self.partitions,
**self.config.get_simple_consumer_args()
)
self.log.debug(
"Connected to kafka. Topic %s, partitions %s, %s",
self.topic,
self.partitions,
','.join(['{0} {1}'.format(k, v) for k, v in
six.iteritems(self.config.get_simple_consumer_args())])
)
self.kafka_consumer.provide_partition_info()
def setup_capture_new_messages_consumer(topic):
"""Seeks to the tail of the topic then returns a function that can
consume messages from that point.
"""
kafka = KafkaClient(get_config().cluster_config.broker_list)
group = str('data_pipeline_clientlib_test')
consumer = SimpleConsumer(kafka, group, topic, max_buffer_size=_ONE_MEGABYTE)
consumer.seek(0, 2) # seek to tail, 0 is the offset, and 2 is the tail
yield consumer
kafka.close()
def __init__(self, name, host='web14', port=51092, **kwargs):
QueueBase.QueueBase.__init__(self, name, host, port)
self.__queue = []
self.__kafka = KafkaClient('%s:%d' % (host, port))
self.__producer = SimpleProducer(self.__kafka, async=kwargs.get('async', False))
self.__producer.client.ensure_topic_exists(self.name)
self.__consumer = SimpleConsumer(self.__kafka, self.name + '_consumer', self.name, auto_commit_every_n=1)
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
topic='topic', partitions=[0, 1],
auto_commit=False)
def failed_payloads(payload):
return FailedPayloadsError(payload)
client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)
# This should not raise an exception
consumer.get_messages(5)
def test_switch_leader_simple_consumer(self):
producer = Producer(self.client, async=False)
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
self._send_random_messages(producer, self.topic, 0, 2)
consumer.get_messages()
self._kill_leader(self.topic, 0)
consumer.get_messages()
def assert_message_count(self, topic, check_count, timeout=10,
partitions=None, at_least=False):
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers])
client = SimpleClient(hosts, timeout=2)
consumer = SimpleConsumer(client, None, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)
started_at = time.time()
pending = -1
while pending < check_count and (time.time() - started_at < timeout):
try:
pending = consumer.pending(partitions)
except FailedPayloadsError:
pass
time.sleep(0.5)
consumer.stop()
client.close()
if pending < check_count:
self.fail('Too few pending messages: found %d, expected %d' %
(pending, check_count))
elif pending > check_count and not at_least:
self.fail('Too many pending messages: found %d, expected %d' %
(pending, check_count))
return True
def get_consumer(containers, topic):
kafka = containers.get_kafka_connection()
group = str('replication_handler_itest')
return SimpleConsumer(kafka, group, topic)
def get_message(self, block=True, timeout=0.1):
"""Get message from kafka. It supports the same arguments of get_message
in kafka-python SimpleConsumer.
:param block: If True, the API will block till at least a message is fetched.
:type block: boolean
:param timeout: If block is True, the function will block for the specified
time (in seconds).
If None, it will block forever.
:returns: a Kafka message
:rtype: Message namedtuple, which consists of: partition number,
offset, key, and message value
"""
fetched_message = self.kafka_consumer.get_message(block, timeout)
if fetched_message is None:
# get message timed out returns None
return None
else:
partition, kafka_message = fetched_message
return Message(
partition=partition,
offset=kafka_message[0],
key=kafka_message[1].key,
value=kafka_message[1].value,
)