分布式消息服务Kafka版
分布式消息服务Kafka版
- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
- 用户指南
- 最佳实践
- 开发指南
- API参考
- SDK参考
-
常见问题
-
实例问题
- 为什么可用区不能选择2个?
- 创建实例时为什么无法查看子网和安全组等信息?
- 如何选择Kafka实例的存储空间?
- Kafka实例的超高IO和高IO如何选择?
- 如何选择Kafka实例存储容量阈值策略?
- Kafka服务端支持版本是多少?
- Kafka实例的ZK地址是什么?
- 创建的Kafka实例是集群模式么?
- Kafka实例是否支持修改访问端口?
- Kafka实例的SSL证书有效期多长?
- 如何将Kafka实例中的数据同步到另一个Kafka实例中?
- Kafka实例的SASL_SSL开关如何修改?
- SASL认证机制如何修改?
- 如何修改安全协议?
- 修改企业项目,是否会导致Kafka重启?
- Kafka服务和ZK是部署在相同的虚拟机中,还是分开部署?
- Kafka包周期实例支持删除吗?
- Kafka支持哪些加密套件?
- 购买实例时选择的单AZ,怎样可以扩展为多AZ?
- Kafka是否支持跨AZ容灾?已经购买的实例在哪里查看是否为跨AZ?
- Kafka支持磁盘加密吗?
- Kafka实例创建后,能修改VPC和子网吗?
- 有没有Kafka Stream的案例?
- Kafka实例版本可以升级吗?
- 怎样重新绑定公网IP?
- 实例规格变更问题
-
连接问题
- 选择和配置安全组
- Kafka实例是否支持公网访问?
- Kafka实例的连接地址默认有多少个?
- 是否支持跨Region访问?
- Kafka实例是否支持跨VPC访问?
- Kafka实例是否支持不同的子网?
- Kafka是否支持Kerberos认证,如何开启认证?
- Kafka实例是否支持无密码访问?
- 开启公网访问后,在哪查看公网IP地址?
- Kafka支持服务端认证客户端吗?
- 连接开启SASL_SSL的Kafka实例时,ssl truststore文件可以用PEM格式的吗?
- 下载的证书JKS和CRT有什么区别?
- Kafka支持哪个版本的TLS?
- Kafka实例连接数有限制吗?
- 客户端单IP连接的个数为多少?
- Kafka实例的内网连接地址可以修改吗?
- 不同实例中,使用的SSL证书是否一样?
- 为什么不建议使用Sarama客户端收发消息?
- Topic和分区问题
- 消费组问题
- 消息问题
-
Kafka Manager问题
- 登录Kafka Manager的账号是否可以设置为只读账号?
- 登录到Kafka Manager页面,为什么获取不到节点信息?
- Yikes! Insufficient partition balance when creating topic : projectman_project_enterprise_project Try again.
- Kafka Manager能否查询到消息的正文?
- Kafka Manager WebUI的端口能否修改?
- 在Kafka Manager上支持修改Topic的哪些属性?
- Kafka Manager和云监控显示的信息不一致
- Kafka Manager如何修改Topic的分区Leader?
- 实例版本在控制台和Kafka Manager上显示不一致?
- 为什么实例中存在默认名为__trace和__consumer_offsets的Topic?
- 客户端删除消费组后,在Kafka Manager中仍可以看到此消费组?
- 监控告警问题
-
实例问题
- 故障排除
-
更多文档
-
用户指南 (阿布扎比区域)
- 产品简介
- 快速入门
- 权限管理
- 准备实例依赖资源
- 购买实例
- 连接Kafka
- 实例管理
- Topic管理
- 消息管理
- 用户管理
- 消费组管理
- 流控管理
- 修改配置参数
- 调整资源配额
- 监控
- 云审计服务支持的关键操作
-
常见问题
-
实例问题
- 为什么可用区不能选择2个?
- 创建实例时为什么无法查看子网和安全组等信息?
- 如何选择Kafka实例的存储空间?
- Kafka实例的超高IO和高IO如何选择?
- 如何选择Kafka实例存储容量阈值策略?
- Kafka服务端支持版本是多少?
- Kafka实例的ZK地址是什么?
- 创建的Kafka实例是集群模式么?
- Kafka实例是否支持修改访问端口?
- Kafka实例的SSL证书有效期多长?
- 如何将Kafka实例中的数据同步到另一个Kafka实例中?
- Kafka实例的SASL_SSL开关如何修改?
- SASL认证机制如何修改?
- 修改企业项目,是否会导致Kafka重启?
- Kafka服务和ZK是部署在相同的虚拟机中,还是分开部署?
- Kafka支持哪些加密套件?
- 购买实例时选择的单AZ,怎样可以扩展为多AZ?
- Kafka是否支持跨AZ容灾?已经购买的实例在哪里查看是否为跨AZ?
- Kafka支持磁盘加密吗?
- Kafka扩容会影响业务吗?
- Kafka实例创建后,能修改VPC和子网吗?
- 有没有Kafka Stream的案例?
- Kafka实例版本可以升级吗?
- 实例版本在控制台和Kafka Manager上显示不一致?
- 怎样重新绑定公网IP?
-
连接问题
- 选择和配置安全组
- Kafka实例是否支持公网访问?
- Kafka实例的连接地址默认有多少个?
- 是否支持跨Region访问?
- Kafka实例是否支持跨VPC访问?
- Kafka实例是否支持不同的子网?
- Kafka是否支持Kerberos认证,如何开启认证?
- Kafka实例是否支持无密码访问?
- 开启公网访问后,在哪查看公网IP地址?
- Kafka支持服务端认证客户端吗?
- 连接开启SASL_SSL的Kafka实例时,ssl truststore文件可以用PEM格式的吗?
- 下载的证书JKS和CRT有什么区别?
- Kafka支持哪个版本的TLS?
- Kafka实例连接数有限制吗?
- 客户端单IP连接的个数为多少?
- Kafka实例的内网连接地址可以修改吗?
- 不同实例中,使用的SSL证书是否一样?
-
Topic和分区问题
- Kafka实例的Topic数量是否有限制?
- 为什么限制Topic的总分区数?
- Kafka支持增加减少分区数吗?
- Kafka实例创建Topic失败
- Kafka实例支持批量导入Topic功能么?或者是自动生成Topic功能?
- 为什么删除Topic不生效?删除后该Topic仍然存在
- Kafka实例是否支持查看单个Topic占用磁盘空间?
- Topic是否支持ACL权限配置?
- 消息被消费后,没有删除,导致Kafka存储空间占满?
- 如何扩总分区?
- 修改自动创建Topic的配置,会触发重启吗?
- 如何关闭自动创建Topic功能?
- Kafka可以删除消费组下不用的Topic吗?
- 消费者消费Topic失败,提示没有权限?
- 为什么实例中存在默认名为__trace和__consumer_offsets的Topic?
- 消费组问题
- 消息问题
-
Kafka Manager问题
- 登录Kafka Manager的帐号是否可以设置为只读帐号?
- 登录到Kafka Manager页面,为什么获取不到节点信息?
- Yikes! Insufficient partition balance when creating topic : projectman_project_enterprise_project Try again.
- Kafka Manager能否查询到消息的正文?
- Kafka Manager WebUI的端口能否修改?
- 在Kafka Manager上支持修改Topic的哪些属性?
- Kafka Manager和云监控显示的信息不一致
- Kafka Manager如何修改Topic的分区Leader?
- 监控告警问题
-
实例问题
- 故障排除
- 修订记录
- API参考(阿布扎比区域)
-
用户指南(巴黎区域)
- 产品简介
- 快速入门
- 权限管理
- 准备实例依赖资源
- 创建实例
- 连接Kafka
- 实例管理
- Topic管理
- 消息管理
- 用户管理
- 消费组管理
- 流控管理
- 修改配置参数
- 调整资源配额
- 监控
- 云审计服务支持的关键操作
-
常见问题
-
实例问题
- 为什么可用区不能选择2个?
- 创建实例时为什么无法查看子网和安全组等信息?
- 如何选择Kafka实例的存储空间?
- Kafka实例的超高IO和高IO如何选择?
- 如何选择Kafka实例存储容量阈值策略?
- Kafka服务端支持版本是多少?
- Kafka实例的ZK地址是什么?
- 创建的Kafka实例是集群模式么?
- Kafka实例是否支持修改访问端口?
- Kafka实例的SSL证书有效期多长?
- 如何将Kafka实例中的数据同步到另一个Kafka实例中?
- Kafka实例的SASL_SSL开关如何修改?
- SASL认证机制如何修改?
- 修改企业项目,是否会导致Kafka重启?
- Kafka服务和ZK是部署在相同的虚拟机中,还是分开部署?
- Kafka支持哪些加密套件?
- 创建实例时选择的单AZ,怎样可以扩展为多AZ?
- Kafka是否支持跨AZ容灾?已经创建的实例在哪里查看是否为跨AZ?
- Kafka支持磁盘加密吗?
- Kafka实例创建后,能修改VPC和子网吗?
- 有没有Kafka Stream的案例?
- Kafka实例版本可以升级吗?
- 实例版本在控制台和Kafka Manager上显示不一致?
- 怎样重新绑定公网IP?
- 实例规格变更问题
-
连接问题
- 选择和配置安全组
- Kafka实例是否支持公网访问?
- Kafka实例的连接地址默认有多少个?
- 是否支持跨Region访问?
- Kafka实例是否支持跨VPC访问?
- Kafka实例是否支持不同的子网?
- Kafka是否支持Kerberos认证,如何开启认证?
- Kafka实例是否支持无密码访问?
- 开启公网访问后,在哪查看公网IP地址?
- Kafka支持服务端认证客户端吗?
- 连接开启SASL_SSL的Kafka实例时,ssl truststore文件可以用PEM格式的吗?
- 下载的证书JKS和CRT有什么区别?
- Kafka支持哪个版本的TLS?
- Kafka实例连接数有限制吗?
- 客户端单IP连接的个数为多少?
- Kafka实例的内网连接地址可以修改吗?
- 不同实例中,使用的SSL证书是否一样?
- 为什么不建议使用Sarama客户端收发消息?
-
Topic和分区问题
- Kafka实例的Topic数量是否有限制?
- 为什么限制Topic的总分区数?
- Kafka支持减少分区数吗?
- Kafka实例创建Topic失败
- Kafka实例支持批量导入Topic功能么?或者是自动生成Topic功能?
- 为什么删除Topic不生效?删除后该Topic仍然存在
- Kafka实例是否支持查看单个Topic占用磁盘空间?
- Topic是否支持ACL权限配置?
- 消息被消费后,没有删除,导致Kafka存储空间占满?
- 如何扩总分区?
- 修改自动创建Topic的配置,会触发重启吗?
- 如何关闭自动创建Topic功能?
- Kafka可以删除消费组下不用的Topic吗?
- 消费者消费Topic失败,提示没有权限?
- 为什么实例中存在默认名为__trace和__consumer_offsets的Topic?
- 消费组问题
- 消息问题
- Kafka Manager问题
- 监控告警问题
-
实例问题
- 故障排除
- 修订记录
- API参考(巴黎区域)
- 开发指南(巴黎区域)
-
用户指南(吉隆坡区域)
- 产品简介
- 快速入门
- Kafka业务使用流程
- 权限管理
- 购买Kafka实例
- 配置Topic
- 连接实例
- 管理消息
- 管理消费组
- 配置流控
- 管理实例
- 变更实例规格
- 迁移数据
- 申请扩大Kafka配额
- 查看监控指标与配置告警
- 查看Kafka审计日志
-
常见问题
-
实例问题
- 为什么可用区不能选择2个?
- 创建实例时为什么无法查看子网和安全组等信息?
- 如何选择Kafka实例的存储空间?
- Kafka实例的超高IO和高IO如何选择?
- 如何选择Kafka实例存储容量阈值策略?
- Kafka服务端支持版本是多少?
- Kafka实例的ZK地址是什么?
- 创建的Kafka实例是集群模式么?
- Kafka实例是否支持修改访问端口?
- Kafka实例的SSL证书有效期多长?
- 如何将Kafka实例中的数据同步到另一个Kafka实例中?
- Kafka实例的SASL_SSL开关如何修改?
- SASL认证机制如何修改?
- 修改企业项目,是否会导致Kafka重启?
- Kafka服务和ZK是部署在相同的虚拟机中,还是分开部署?
- Kafka支持哪些加密套件?
- 购买实例时选择的单AZ,怎样可以扩展为多AZ?
- Kafka是否支持跨AZ容灾?已经购买的实例在哪里查看是否为跨AZ?
- Kafka支持磁盘加密吗?
- Kafka实例创建后,能修改VPC和子网吗?
- 有没有Kafka Stream的案例?
- Kafka实例版本可以升级吗?
- 配置了“自动删除”策略,磁盘容量到达95%时,为什么没有自动删除最早的消息?
- 实例规格变更问题
-
连接问题
- 选择和配置安全组
- Kafka实例是否支持公网访问?
- Kafka实例的连接地址默认有多少个?
- 是否支持跨Region访问?
- Kafka实例是否支持跨VPC访问?
- Kafka实例是否支持不同的子网?
- Kafka是否支持Kerberos认证,如何开启认证?
- Kafka实例是否支持无密码访问?
- Kafka支持服务端认证客户端吗?
- 连接开启SASL_SSL的Kafka实例时,ssl truststore文件可以用PEM格式的吗?
- 下载的证书JKS和CRT有什么区别?
- Kafka支持哪个版本的TLS?
- Kafka实例连接数有限制吗?
- 客户端单IP连接的个数为多少?
- Kafka实例的内网连接地址可以修改吗?
- 不同实例中,使用的SSL证书是否一样?
- 为什么不建议使用Sarama客户端收发消息?
- Topic和分区问题
- 消费组问题
- 消息问题
-
Kafka Manager问题
- 登录Kafka Manager的账号是否可以设置为只读账号?
- 登录到Kafka Manager页面,为什么获取不到节点信息?
- Yikes! Insufficient partition balance when creating topic : projectman_project_enterprise_project Try again.
- Kafka Manager能否查询到消息的正文?
- Kafka Manager WebUI的端口能否修改?
- 在Kafka Manager上支持修改Topic的哪些属性?
- Kafka Manager和云监控显示的信息不一致
- Kafka Manager如何修改Topic的分区Leader?
- 实例版本在控制台和Kafka Manager上显示不一致?
- 为什么实例中存在默认名为__trace和__consumer_offsets的Topic?
- 客户端删除消费组后,在Kafka Manager中仍可以看到此消费组?
- 监控告警问题
-
实例问题
- 故障排除
- 修订记录
- API参考(吉隆坡区域)
-
用户指南 (阿布扎比区域)
- 通用参考
链接复制成功!
Python
本文以Linux CentOS环境为例,介绍Python版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。
使用前请参考收集连接信息收集Kafka所需的连接信息。
准备环境
- Python
一般系统预装了Python,您可以在命令行输入python或者python3,查看Python是否已经安装。python命令只能查询Python 2.x版本,python3命令只能查询Python 3.x版本,如果无法确认Python版本,请分别输入两个命令查看结果。
以Python 3.x为例,得到如下回显,说明Python已安装。
[root@ecs-test python-kafka]# python3 Python 3.7.1 (default, Jul 5 2020, 14:37:24) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
如果未安装Python,请使用以下命令安装:
yum install python
- Python版的Kafka客户端
- Python 2.x版本:pip install kafka-python==2.0.1
- Python 3.x版本:pip3 install kafka-python==2.0.1
生产消息
- 在客户端创建一个文件,用于存放生产消息的代码示例。
touch producer.py
producer.py表示文件名,您可以自定义文件名。
- 执行以下命令,编辑文件。
vim producer.py
- 将以下生产消息的代码示例写入文件中,并保存。
- SASL认证方式
from kafka import KafkaProducer import ssl ##连接信息 conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_username': 'username', 'sasl_password': 'password' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ##如果Kafka安全协议设置为“SASL_PLAINTEXT”,请注释以下参数。 context.verify_mode = ssl.CERT_REQUIRED ##证书文件,SSL证书参考“收集连接信息”章节获取。如果Kafka安全协议设置为“SASL_PLAINTEXT”,请注释以下参数。 context.load_verify_locations("phy_ca.crt") print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_username'], sasl_plain_password=conf['sasl_password']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- bootstrap_servers:实例连接地址与端口。
- topic_name:Topic名称。
- sasl_plain_username/sasl_plain_password:首次开启密文接入时设置的用户名与密码,或者创建用户时设置的用户名和密码。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
- context.load_verify_locations:证书文件。如果Kafka安全协议设置为“SASL_SSL”,需要设置此参数。使用Python语言连接实例时,需要用CRT格式的证书。
- sasl_mechanism:SASL认证机制。在Kafka实例控制台的基本信息页面中获取。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。
- security_protocol:Kafka的安全协议。在Kafka实例控制台的基本信息页面中获取。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。
- 安全协议设置为“SASL_SSL”时,采用SASL方式进行认证,数据通过SSL证书进行加密传输,安全性更高。此时需要配置连接实例的用户名和密码,以及证书文件。
- 安全协议设置为“SASL_PLAINTEXT”时,采用SASL方式进行认证,数据通过明文传输,性能更好。此时需要配置连接实例的用户名和密码,无需配置证书文件。
- 非SASL认证方式
from kafka import KafkaProducer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic-name', } print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- bootstrap_servers:实例连接地址与端口。
- topic_name:Topic名称。
- SASL认证方式
- 执行以下命令,运行生产消息的代码示例。
# Python 2.x版本 python producer.py # Python 3.x版本 python3 producer.py
运行成功后,返回如下回显。
[root@ecs-test ~]# python3 producer.py start producer end producer [root@ecs-test ~]#
消费消息
- 在客户端创建一个文件,用于存放消费消息的代码示例。
touch consumer.py
consumer.py表示文件名,您可以自定义文件名。
- 执行以下命令,编辑文件。
vim consumer.py
- 将以下消费消息的代码示例写入文件中,并保存。
- SASL认证方式
from kafka import KafkaConsumer import ssl ##连接信息 conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_username': 'username', 'sasl_password': 'password', 'consumer_id': 'consumer_id' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ##如果Kafka安全协议设置为“SASL_PLAINTEXT”,请注释以下参数。 context.verify_mode = ssl.CERT_REQUIRED ##证书文件,SSL证书参考“收集连接信息”章节获取。如果Kafka安全协议设置为“SASL_PLAINTEXT”,请注释以下参数。 context.load_verify_locations("phy_ca.crt") print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_username'], sasl_plain_password=conf['sasl_password']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- bootstrap_servers:实例连接地址与端口。
- topic_name:Topic名称。
- sasl_plain_username/sasl_plain_password:首次开启密文接入时设置的用户名与密码,或者创建用户时设置的用户名和密码。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
- consumer_id:消费组名称。根据业务需求,自定义消费组名称,如果设置的消费组不存在,Kafka会自动创建。
- context.load_verify_locations:证书文件。如果Kafka安全协议设置为“SASL_SSL”,需要设置此参数。使用Python语言连接实例时,需要用CRT格式的证书。
- sasl_mechanism:SASL认证机制。在Kafka实例控制台的基本信息页面中获取。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。
- security_protocol:Kafka的安全协议。在Kafka实例控制台的基本信息页面中获取。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。
- 安全协议设置为“SASL_SSL”时,采用SASL方式进行认证,数据通过SSL证书进行加密传输,安全性更高。此时需要配置连接实例的用户名和密码,以及证书文件。
- 安全协议设置为“SASL_PLAINTEXT”时,采用SASL方式进行认证,数据通过明文传输,性能更好。此时需要配置连接实例的用户名和密码,无需配置证书文件。
- 非SASL认证方式
from kafka import KafkaConsumer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic-name', 'consumer_id': 'consumer-id' } print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- bootstrap_servers:实例连接地址与端口。
- topic_name:Topic名称。
- consumer_id:消费组名称。根据业务需求,自定义消费组名称,如果设置的消费组不存在,Kafka会自动创建。
- SASL认证方式
- 执行以下命令,运行消费消息的代码示例。
# Python 2.x版本 python consumer.py # Python 3.x版本 python3 consumer.py
运行成功后,返回如下回显。
[root@ecs-test ~]# python3 consumer.py start consumer
如需停止消费使用Ctrl+C命令退出。