kafka-python客户端支持 Kafka 0.9,但显然不包括新的身份验证和加密功能,因此我猜测它仅适用于开放服务器(与以前的版本一样)。无论如何,即使是 Java 客户端也需要一个特殊的消息中心登录模块来连接(或者从示例中看起来是这样),这表明除非有可用于 Python 的类似模块,否则什么都行不通。
我的具体场景是,我想使用同样托管在 Bluemix(Apache Spark 服务)中的 Jupyter 笔记本中的消息中心服务。
确实,kafka-python 库在支持 Kafka 0.9 和旧版本的同时,并没有原生支持 Kafka 的一些新功能,比如 SASL(用于身份验证的简单认证和安全层)和加密(SSL/TLS)。因此,如果你的 Kafka 集群启用了这些功能,kafka-python 可能无法直接连接,尤其是当你使用的 Kafka 集群启用了认证、加密等更为严格的安全机制时。
kafka-python
不过,如果你想通过 kafka-python 连接到启用了认证和加密的 Kafka 服务(例如 Bluemix 上的 Kafka 服务),你仍然可以采取一些措施,利用 SASL 和 SSL/TLS 进行身份验证和加密。
以下是一些方法和思路,帮助你解决这个问题:
confluent-kafka-python
confluent-kafka-python 是由 Confluent 提供的 Kafka 客户端,它支持最新版本的 Kafka,并且内置了对 Kafka 身份验证和加密(SASL 和 SSL/TLS)的支持。这个库是目前最推荐的 Kafka Python 客户端。
你可以通过以下方式安装 confluent-kafka-python:
pip install confluent-kafka
然后,你可以使用这个库来连接到启用了身份验证和加密的 Kafka 集群。以下是一个示例代码,展示如何使用 confluent-kafka-python 连接到启用了 SASL 和 SSL 的 Kafka 集群:
from confluent_kafka import Producer conf = { 'bootstrap.servers': 'your_broker_address', # Kafka broker地址 'security.protocol': 'SASL_SSL', # 安全协议(SASL + SSL) 'sasl.mechanisms': 'PLAIN', # SASL机制,通常为 PLAIN 或 SCRAM-SHA-256 'sasl.username': 'your_username', # 用户名 'sasl.password': 'your_password', # 密码 'ssl.ca.location': '/path/to/ca-cert', # CA证书路径(如果有) 'ssl.cert.location': '/path/to/client-cert', # 客户端证书路径(如果有) 'ssl.key.location': '/path/to/client-key', # 客户端密钥路径(如果有) } # 创建 Kafka 生产者 producer = Producer(conf) # 发送一条消息 producer.produce('your_topic', key='key', value='value') producer.flush() # 确保所有消息都已发送
这个代码示例展示了如何配置 SASL 和 SSL 以进行安全连接,连接到启用了认证和加密的 Kafka 服务。
虽然 kafka-python 本身不直接支持这些功能,但你仍然可以通过传递自定义配置来启用 SSL 和 SASL 认证。你需要安装 kafka-python:
pip install kafka-python
然后,可以按以下方式配置它:
from kafka import KafkaProducer from kafka import KafkaConsumer import ssl # 配置 Kafka producer producer = KafkaProducer( bootstrap_servers='your_broker_address', security_protocol='SASL_SSL', # 使用 SASL_SSL 进行安全连接 sasl_mechanism='PLAIN', # 选择 SASL 机制 sasl_plain_username='your_username', # 用户名 sasl_plain_password='your_password', # 密码 ssl_cafile='/path/to/ca-cert', # CA证书 ssl_certfile='/path/to/client-cert', # 客户端证书 ssl_keyfile='/path/to/client-key', # 客户端密钥 ) # 配置 Kafka consumer consumer = KafkaConsumer( 'your_topic', bootstrap_servers='your_broker_address', security_protocol='SASL_SSL', sasl_mechanism='PLAIN', sasl_plain_username='your_username', sasl_plain_password='your_password', ssl_cafile='/path/to/ca-cert', ssl_certfile='/path/to/client-cert', ssl_keyfile='/path/to/client-key', auto_offset_reset='earliest', ) # 发送消息 producer.send('your_topic', key=b'key', value=b'value') # 消费消息 for msg in consumer: print(msg.value)
这里,我们通过配置 KafkaProducer 和 KafkaConsumer 来支持 SASL 和 SSL 连接。具体来说:
KafkaProducer
KafkaConsumer
security_protocol='SASL_SSL'
sasl_mechanism='PLAIN'
PLAIN
ssl_cafile
ssl_certfile
ssl_keyfile
你可以将上述代码集成到你的 Jupyter Notebook 中,直接进行 Kafka 的生产者和消费者操作。如果 Bluemix 中的 Kafka 服务启用了安全认证和加密,只需在代码中正确配置凭证和证书路径即可。
如果你使用的是 Bluemix 提供的 Kafka 服务,请参考其文档以获取关于如何配置 SSL 和 SASL 的更多信息。