快速连接Kafka并生产消费消息
本章节将为您介绍分布式消息服务Kafka版入门的使用流程,以创建并连接一个开启SASL的Kafka 2.7实例为例,帮助您快速上手Kafka。
您还可以通过API方式创建Kafka实例、在业务代码中连接Kafka实例。
使用流程
- 环境准备
Kafka实例运行于虚拟私有云(Virtual Private Cloud,以下简称VPC)中,在创建实例前需要确保有可用的虚拟私有云。
Kafka实例创建后,您需要在弹性云服务器中下载和安装Kafka开源客户端,然后才能进行生产消息和消费消息。
- 创建Kafka实例
在创建实例时,您可以根据需求选择需要的实例规格和数量,并开启SASL访问。开启SASL后,数据加密传输,安全性更高。
- (可选)创建Topic
Kafka实例创建成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic,然后才能进行生产消息和消费消息。
- 连接实例
在连接开启SASL的Kafka实例时,需要下载证书,并在客户端配置文件中设置连接信息。
关于Kafka的相关概念,请参考Kafka基本概念。
步骤一:准备环境
- 如果需要对云上的资源进行精细管理,请使用IAM服务创建IAM用户及用户组,并授权,以使得IAM用户获得具体的操作权限。具体操作,请参考创建用户并授权使用DMS for Kafka。
- 在创建Kafka实例前,确保已存在可用的VPC和子网。
Kafka实例可以使用当前账号下已创建的VPC和子网,也可以使用新创建的VPC和子网,请根据实际需要进行配置。创建VPC和子网的操作指导,请参考创建虚拟私有云和子网。请注意:创建的VPC与Kafka实例必须在相同的区域。
- 在创建Kafka实例前,确保已存在可用的安全组。
Kafka实例可以使用当前账号下已创建的安全组,也可以使用新创建的安全组,请根据实际需要进行配置。创建安全组的操作步骤,请参考创建安全组。
- 在连接Kafka实例前,需要先创建弹性云服务器(Elastic Cloud Server,以下简称ECS)、安装JDK、配置环境变量以及下载Kafka开源客户端。本文以Linux系统的ECS为例,Windows系统ECS的JDK安装与环境变量配置可自行在互联网查找相关帮助。
- 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。
创建ECS的操作步骤,请参考创建弹性云服务器。如果您已有可用的ECS,可重复使用,不需要再次创建。
- 登录ECS。
- 安装Java JDK或JRE,并配置JAVA_HOME与PATH环境变量。
使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。
export JAVA_HOME=/opt/java/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH
执行source .bash_profile命令使修改生效。
ECS默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本。
- 下载开源的Kafka客户端。
wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
- 解压Kafka客户端文件。
tar -zxf kafka_2.12-2.7.2.tgz
- 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。
步骤二:创建Kafka实例
- 登录分布式消息服务Kafka控制台,单击页面右上方的“购买Kafka实例”。
- “计费模式”选择“按需计费”。
- 在“区域”下拉列表中,选择靠近您应用程序的区域,可降低网络延时、提高访问速度。
- 在“项目”下拉列表中,选择项目。
- 在“可用区”区域,根据实际情况选择1个或者3个及以上可用区。
- 设置“实例名称”和“企业项目”。
- 设置实例信息,配置详情请参考表2。
- 设置实例网络环境信息,配置详情请参考表3。
- 设置实例的访问方式,配置详情请参考表4。
- 设置登录Kafka Manager的用户名和密码。创建实例后,Kafka Manager用户名无法修改。
Kafka Manager是开源的Kafka集群管理工具,实例创建成功后,实例详情页面会展示Kafka Manager登录地址,您可登录Kafka Manager页面,查看Kafka集群的监控、代理等信息。
- 单击“更多配置”,设置更多相关信息,配置详情请参考表5。
- 填写完上述信息后,单击“立即购买”,进入规格确认页面。
- 确认实例信息无误后,提交请求。
- 单击“返回Kafka专享版列表”,查看Kafka实例是否创建成功。
创建实例大约需要3到15分钟,此时实例的“状态”为“创建中”。
- 当实例的“状态”变为“运行中”时,说明实例创建成功。
- 当实例的“状态”变为“创建失败”,请删除创建失败的实例,然后重新创建。如果重新创建仍然失败,请联系客服。
创建失败的实例,不会占用其他资源。
(可选)步骤三:创建Topic
- 在“Kafka专享版”页面,单击Kafka实例的名称,进入实例详情页面。
- 在左侧导航栏单击“Topic管理”,进入Topic列表页。
- 单击“创建Topic”,弹出“创建Topic”对话框。
- 填写Topic名称和配置信息,单击“确定”,完成创建Topic。
表6 Topic参数说明 参数
说明
Topic名称
设置为“topic-01”
创建Topic后不能修改名称。
分区数
设置为“3”
分区数越大消费的并发度越大。
副本数
设置为“3”
Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。
说明:实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。
老化时间(小时)
设置为“72”
消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。
同步复制
不开启
同步落盘
不开启
消息时间戳类型
选择“CreateTime”
批处理消息最大值
保持默认值
步骤四:连接实例生产消费消息
- 获取实例的连接地址。
- 在左侧导航栏单击“基本信息”,进入实例详情页。
- 在“连接信息”区域,查看连接地址。
图2 使用内网通过同一个VPC访问Kafka实例的连接地址
- 配置生产消费配置文件。
- 登录Linux系统的ECS。
- 在ECS的“/etc/hosts”文件中配置host和IP的映射关系,以便客户端能够快速解析实例的Broker。
其中,IP地址必须为实例连接地址(从1中获取的连接地址),host为每个实例主机的名称(您可以自定义主机的名称,但不能重复)。
例如:
10.154.48.120 server01
10.154.48.121 server02
10.154.48.122 server03
- 下载client.truststore.jks证书。
在Kafka控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。解压压缩包,获取压缩包中的客户端证书文件:client.truststore.jks。
- 在“consumer.properties”和“producer.properties”文件中分别增加如下行(示例以PLAIN机制为例介绍):
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN security.protocol=SASL_SSL ssl.truststore.location={ssl_truststore_path} ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
参数说明:
- username和password为创建Kafka实例过程中开启SASL_SSL时填入的用户名和密码。
- ssl.truststore.location配置为2.c证书的存放路径。
- ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
- 生产消息。
进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息。
./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties
参数说明如下:
示例如下,“192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093”为Kafka实例连接地址。
执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-demo --producer.config ../config/producer.properties >Hello >DMS >Kafka! >^C[root@ecs-kafka bin]#
如需停止生产使用Ctrl+C命令退出。
- 消费消息。
执行如下命令消费消息。
./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --group ${消费组名称} --from-beginning --consumer.config ../config/consumer.properties
参数说明如下:
- 连接地址:从1中获取的连接地址。
- Topic名称:从4中获取的Topic名称。
- 消费组名称:根据您的业务需求,设定消费组名称。如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。消费组名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties Hello Kafka! DMS ^CProcessed a total of 3 messages [root@ecs-kafka bin]#
如需停止消费使用Ctrl+C命令退出。