Python客户端使用说明
操作场景
本文以Linux CentOS环境为例,介绍Python版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。
引入Kafka客户端
MQS基于Kafka社区版本1.1.0、2.7,您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。Python开源客户端的版本使用请参见客户端版本使用建议。
执行以下命令,安装对应版本的Python Kafka客户端:
pip install kafka-python==2.0.1
生产消息
- SASL认证方式
from kafka import KafkaProducer import ssl ##连接信息 conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##证书文件 context.load_verify_locations("phy_ca.crt") print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- bootstrap_servers:MQS连接地址和端口。
- topic_name:要生产消息的Topic名称。
- sasl_plain_username和sasl_plain_password:开启SASL_SSL认证时所使用的用户名和密码。
- context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。
- 非SASL认证方式
from kafka import KafkaProducer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', } print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- bootstrap_servers:MQS连接地址和端口。
- topic_name:要生产消息的Topic名称。
消费消息
- SASL认证方式
from kafka import KafkaConsumer import ssl ##连接信息 conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password', 'consumer_id': 'consumer_id' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##证书文件 context.load_verify_locations("phy_ca.crt") print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- bootstrap_servers:MQS连接地址和端口。
- topic_name:要消费消息的Topic名称。
- sasl_plain_username和sasl_plain_password:开启SASL_SSL认证时所使用的用户名和密码。
- consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
- context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。
- 非SASL认证方式
from kafka import KafkaConsumer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'consumer_id': 'consumer_id' } print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- bootstrap_servers:MQS连接地址和端口。
- topic_name:要消费消息的Topic名称。
- consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。