快速连接Kafka并生产消费消息
本章节将为您介绍分布式消息服务Kafka版入门的使用流程,以创建一个开启密文接入、安全协议为SASL_SSL的Kafka实例,客户端使用内网通过同一个VPC连接Kafka实例生产消费消息为例,帮助您快速上手Kafka。
- 准备工作
Kafka实例运行于虚拟私有云(Virtual Private Cloud,简称VPC)中,在创建实例前需要确保有可用的虚拟私有云。
Kafka实例创建后,您需要在弹性云服务器中下载和安装Kafka开源客户端,然后才能进行生产消息和消费消息。
- 创建Kafka实例
在创建实例时,您可以根据需求选择需要的实例规格和数量,并开启密文接入,安全协议设置为“SASL_SSL”。
连接安全协议为“SASL_SSL”的Kafka实例时,使用SASL认证,数据通过SSL证书进行加密传输,安全性更高。
- 创建Topic
Topic用于存储消息,供生产者生产消息以及消费者订阅消息。
本文以在控制台创建Topic为例介绍。
- 连接实例
在连接安全协议为“SASL_SSL”的Kafka实例时,需要下载证书,并在客户端配置文件中设置连接信息。
步骤一:准备工作
- 注册华为账号并实名认证。
在创建Kafka实例前,请先注册华为账号并实名认证,具体步骤请参考注册华为账号并开通华为云和实名认证。
如果您已有一个华为账号并实名认证,请跳过此步骤。
- 为账户充值。
在创建Kafka实例前,确保账户有足够金额。账户充值的具体步骤,请参考账户充值。
- 为用户添加Kafka实例的操作权限。
如果您需要对云上的资源进行精细管理,请使用统一身份认证服务(Identity and Access Management,简称IAM)创建IAM用户及用户组,并授权,以使得IAM用户获得Kafka实例的操作权限。具体操作请参考创建用户并授权使用DMS for Kafka。
- 创建VPC和子网。
在创建Kafka实例前,确保已存在可用的VPC和子网。创建VPC和子网的具体步骤,请参考创建虚拟私有云和子网。
创建的VPC与Kafka实例必须在相同的区域。
- 创建安全组并添加安全组规则。
在创建Kafka实例前,确保已存在可用的安全组。创建安全组的具体步骤,请参考创建安全组。
- 构建生产消费客户端。
本文以Linux系统的弹性云服务器(Elastic Cloud Server,简称ECS)作为生产消费客户端。在创建Kafka实例前,请先创建开启弹性公网IP的ECS、安装JDK、配置环境变量以及下载Kafka开源客户端。
- 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。
创建ECS的具体步骤,请参考购买弹性云服务器。如果您已有可用的ECS,可重复使用,不需要再次创建。
- 使用root用户登录ECS。
- 安装Java JDK,并配置JAVA_HOME与PATH环境变量。
- 下载Java JDK。
ECS默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本。
- 解压Java JDK。
tar -zxvf jdk-8u321-linux-x64.tar.gz
“jdk-8u321-linux-x64.tar.gz”为JDK的版本,请根据实际情况修改。
- 打开“.bash_profile”文件。
vim ~/.bash_profile
- 添加如下内容。
export JAVA_HOME=/root/jdk1.8.0_321 export PATH=$JAVA_HOME/bin:$PATH
“/root/jdk1.8.0_321”为JDK的安装路径,请根据实际情况修改。
- 按“Esc”,然后输入以下命令,按“Enter”,保存并退出“.bash_profile”文件。
:wq
- 执行如下命令使修改生效。
source .bash_profile
- 查看Java JDK是否安装成功。
java -version
回显信息中包含如下信息,表示Java JDK安装成功。java version "1.8.0_321"
- 下载Java JDK。
- 下载开源的Kafka客户端。
wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
- 解压Kafka客户端文件。
tar -zxf kafka_2.12-2.7.2.tgz
- 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。
步骤二:创建Kafka实例
- 进入购买Kafka实例页面。
- 设置实例基础信息,如图2所示,配置详情请参考表2。
表2 设置实例基础信息 参数
说明
计费模式
选择“按需计费”,即先使用再付费,按照Kafka实例实际使用时长计费,秒级计费,按小时结算。
区域
不同区域的云服务产品之间内网互不相通。请就近选择靠近您业务的区域,可减少网络时延,提高访问速度。
选择“亚太-新加坡”。
项目
每个区域默认对应一个项目,这个项目由系统预置,用来隔离物理区域间的资源(计算资源、存储资源和网络资源)。
选择“亚太-新加坡(默认)”。
可用区
可用区指在同一区域下,电力、网络隔离的物理区域,可用区之间内网互通,不同可用区之间物理隔离。
选择“可用区1、可用区2、可用区3”。
实例名称
实例名称支持自定义,但需要符合命名规则:长度为4~64个字符,由英文字母开头,只能由英文字母、数字、中划线、下划线组成。
输入“kafka-test”。
企业项目
该参数针对企业用户使用。企业项目是对企业不同项目间资源的分组和管理,属于逻辑隔离。
选择“default”。
规格选择模式
选择“集群版”,创建一个集群Kafka实例。
版本
Kafka的版本号。Kafka实例创建后,版本号不支持修改。
选择“2.7”。
代理规格
根据业务需求选择相应的代理规格。
选择“kafka.2u4g.cluster”。
代理数量
根据业务需求选择相应的代理数量。
选择“3”。
单个代理存储空间
根据实际需要选择存储Kafka数据的磁盘类型和磁盘大小。
实例总存储空间 = 单个代理的存储空间 * 代理数量,Kafka实例创建后,磁盘类型不支持修改。
磁盘类型选择“超高I/O”,磁盘大小设置为“100GB”。
容量阈值策略
选择“自动删除”,即磁盘达到95%的容量阈值后,可以正常生产和消费消息,但是会删除最早的10%的消息,以保证磁盘容量充足。该场景优先保障业务不中断,数据存在丢失的风险。
- 设置实例网络环境信息,如图3所示,配置详情请参考表3。
- 设置实例的访问方式,如图4所示,配置详情请参考表4。
- 不设置“更多配置”,保持默认即可。
- 单击“立即购买”,进入规格确认页面。
- 确认实例信息无误后,提交请求。
- 单击“返回Kafka专享版列表”,查看Kafka实例是否创建成功。
创建实例大约需要3到15分钟,此时实例的“状态”为“创建中”。
- 当实例的“状态”变为“运行中”时,说明实例创建成功。
- 当实例的“状态”变为“创建失败”,请删除创建失败的实例,重新创建。如果重新创建仍然失败,请联系客服。
创建失败的实例,不会占用其他资源。
- 实例创建成功后,单击实例名称,进入实例详情页。
- 在“连接信息”区域,查看并记录连接地址。
图5 使用内网通过同一个VPC访问Kafka实例的连接地址
步骤三:创建Topic
- 在“Kafka专享版”页面,单击Kafka实例的名称,进入实例详情页面。
- 在左侧导航栏单击“Topic管理”,进入Topic列表页。
- 单击“创建Topic”,弹出“创建Topic”对话框。
- 填写Topic名称和配置信息,配置详情请参考表5,单击“确定”,完成创建Topic。
表5 Topic参数说明 参数
说明
Topic名称
名称支持自定义,但需要符合命名规则:以英文字母、数字、下划线开头,且只能由英文字母、数字、句点、中划线、下划线组成,长度为3~200个字符。
名称不能为以下内置Topic:
- __consumer_offsets
- __transaction_state
- __trace
- __connect-status
- __connect-configs
- __connect-offsets
创建Topic后不能修改名称。
输入“topic-01”。
分区数
如果分区数与消费者数一致,分区数越大消费的并发度越大。
输入“3”。
副本数
Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。
输入“3”。
老化时间(小时)
消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。
输入“72”。
同步复制
选择“不开启”,即Leader副本接收到消息并成功写入到本地日志后,就马上向客户端发送写入成功的消息,不需要等待所有Follower副本复制完成。
同步落盘
选择“不开启”,即生产的消息存在内存中,不会立即写入磁盘。
消息时间戳类型
选择“CreateTime”,即生产者创建消息的时间。
批处理消息最大值(字节)
Kafka允许的最大批处理大小,如果在生产客户端配置文件或代码中启用消息压缩,则表示压缩后的最大批处理大小。
输入“10485760”。
描述
不设置。
图6 创建Topic
步骤四:连接实例生产消费消息
- 配置生产消费配置文件。
- 登录Linux系统的ECS。
- 下载client.jks证书并上传到ECS的“/root”目录下。
获取证书的方法如下:在Kafka控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。解压压缩包,获取压缩包中的客户端证书文件:client.jks。
“/root”为证书存放路径,请根据实际情况修改。
- 进入Kafka客户端文件的“/config”目录下。
cd kafka_2.12-2.7.2/config
- 在“consumer.properties”和“producer.properties”文件中分别增加如下行(示例以PLAIN机制为例介绍)。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN security.protocol=SASL_SSL ssl.truststore.location={ssl_truststore_path} ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
参数说明:
- username和password为创建Kafka实例过程中开启密文接入时填入的用户名和密码。
- ssl.truststore.location配置为1.b证书的存放路径。
- ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
- 进入Kafka客户端文件的“/bin”目录下。
cd ../bin
- 生产消息。
./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties
参数说明如下:
示例如下,“192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093”为Kafka实例连接地址。
执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-01 --producer.config ../config/producer.properties >Hello >DMS >Kafka! >^C[root@ecs-kafka bin]#
如需停止生产使用Ctrl+C命令退出。
- 消费消息。
./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --from-beginning --consumer.config ../config/consumer.properties
参数说明如下:
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-01 --from-beginning --consumer.config ../config/consumer.properties Hello Kafka! DMS ^CProcessed a total of 3 messages [root@ecs-kafka bin]#
如需停止消费使用Ctrl+C命令退出。
相关信息
- 了解Kafka实例的相关概念,请参考Kafka基本概念。
- 了解Kafka实例的价格,请参考分布式消息服务Kafka版价格详情。
- 在控制台查看消息,请参考查看Kafka消息。
- 查看消费进度,请参考查看Kafka消费进度。
- 查看Kafka实例的监控指标,请参考查看Kafka监控数据。