Python kafka 模块,SimpleProducer() 实例源码
我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用kafka.SimpleProducer()。
def test_producer_sync_fail_on_error(self):
error = FailedPayloadsError('failure')
with patch.object(SimpleClient, 'load_metadata_for_topics'):
with patch.object(SimpleClient, 'ensure_topic_exists'):
with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
client = SimpleClient(MagicMock())
producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
# This should not raise
(response,) = producer.send_messages('foobar', b'test message')
self.assertEqual(response, error)
producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
with self.assertRaises(FailedPayloadsError):
producer.send_messages('foobar', b'test message')
def __init__(self, api):
self.api = api
super(tweepy.StreamListener, self).__init__()
client = KafkaClient("localhost:9092")
self.producer = SimpleProducer(client, async = True,
batch_send_every_n = 1000,
batch_send_every_t = 10)
def __init__(self, topic, server, client=None, **kwargs):
try:
import kafka
except ImportError:
raise OutputError('Lack of kafka module, try to execute `pip install kafka-python>=1.3.1` install it')
client = client or kafka.SimpleClient
self._producer = None
self._topic = topic
try:
self._kafka = client(server, **kwargs)
except Exception, e:
raise OutputError('kafka client init failed: %s' % e)
self.producer(kafka.SimpleProducer)
super(Kafka, self).__init__()
def __init__(self, name, host='web14', port=51092, **kwargs):
QueueBase.QueueBase.__init__(self, name, host, port)
self.__queue = []
self.__kafka = KafkaClient('%s:%d' % (host, port))
self.__producer = SimpleProducer(self.__kafka, async=kwargs.get('async', False))
self.__producer.client.ensure_topic_exists(self.name)
self.__consumer = SimpleConsumer(self.__kafka, self.name + '_consumer', self.name, auto_commit_every_n=1)
def produce_example_msg(topic, num_messages=1):
kafka = KafkaToolClient(KAFKA_URL)
producer = SimpleProducer(kafka)
for i in range(num_messages):
try:
producer.send_messages(topic, b'some message')
except LeaderNotAvailableError:
# Sometimes kafka takes a bit longer to assign a leader to a new
# topic
time.sleep(10)
producer.send_messages(topic, b'some message')
def test_topic_message_types(self):
client = MagicMock()
def partitions(topic):
return [0, 1]
client.get_partition_ids_for_topic = partitions
producer = SimpleProducer(client, random_start=False)
topic = b"test-topic"
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called
def __init__(self, api):
self.api = api
super(tweepy.StreamListener, self).__init__()
client = KafkaClient("localhost:9092")
self.producer = SimpleProducer(client, async = True,
batch_send_every_n = 1000,
batch_send_every_t = 10)
def make_kafka_producer(kafka_znode):
kafka_brokers = get_kafka_brokers(kafka_znode)
kafka_client = KafkaClient(kafka_brokers)
return SimpleProducer(
kafka_client,
async=False,
req_acks=1,
random_start=True
)
def __init__(self, settings):
self.settings = settings
self.client = KafkaClient(settings.get("KAFKA_HOSTS"))
self.producer = SimpleProducer(self.client)
self.producer.send_messages = failedpayloads_wrapper(
settings.get("KAFKA_RETRY_TIME", 5))(self.producer.send_messages)
super(KafkaHandler, self).__init__()