快速连接Kafka并生产消费消息
本章节将为您介绍分布式消息服务Kafka版入门的使用流程,以创建一个开启密文接入、安全协议为SASL_SSL的Kafka实例,客户端使用内网通过同一个VPC连接Kafka实例生产消费消息为例,帮助您快速上手Kafka。
- 准备工作
Kafka实例运行于虚拟私有云(Virtual Private Cloud,简称VPC)中,在创建实例前需要确保有可用的虚拟私有云。
Kafka实例创建后,您需要在弹性云服务器中下载和安装Kafka开源客户端,然后才能进行生产消息和消费消息。
- 创建Kafka实例
在创建实例时,您可以根据需求选择需要的实例规格和数量,并开启密文接入,安全协议设置为“SASL_SSL”。
连接安全协议为“SASL_SSL”的Kafka实例时,使用SASL认证,数据通过SSL证书进行加密传输,安全性更高。
- 创建Topic
Topic用于存储消息,供生产者生产消息以及消费者订阅消息。
本文以在控制台创建Topic为例介绍。
- 连接实例
在连接安全协议为“SASL_SSL”的Kafka实例时,需要下载证书,并在客户端配置文件中设置连接信息。
步骤一:准备工作
- 注册华为账号并实名认证。
在创建Kafka实例前,请先注册华为账号并实名认证,具体步骤请参考注册华为账号并开通华为云和实名认证介绍。
如果您已有一个华为账号并实名认证,请跳过此步骤。
- 为账户充值。
在创建Kafka实例前,确保账户有足够金额。账户充值的具体步骤,请参考账户充值。
- 为用户添加Kafka实例的操作权限。
如果您需要对云上的资源进行精细管理,请使用统一身份认证服务(Identity and Access Management,简称IAM)创建IAM用户及用户组,并授权,以使得IAM用户获得Kafka实例的操作权限。具体操作请参考创建用户并授权使用DMS for Kafka。
- 创建VPC和子网。
在创建Kafka实例前,确保已存在可用的VPC和子网。创建VPC和子网的具体步骤,请参考创建虚拟私有云和子网。
创建的VPC与Kafka实例必须在相同的区域。
- 创建安全组并添加安全组规则。
在创建Kafka实例前,确保已存在可用的安全组。创建安全组的具体步骤,请参考创建安全组。
- 构建生产消费客户端。
本文以Linux系统的弹性云服务器(Elastic Cloud Server,简称ECS)作为生产消费客户端。在创建Kafka实例前,请先创建开启弹性公网IP的ECS、安装JDK、配置环境变量以及下载Kafka开源客户端。
- 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。
创建ECS的具体步骤,请参考购买弹性云服务器。如果您已有可用的ECS,可重复使用,不需要再次创建。
- 使用root用户登录ECS。
- 安装Java JDK,并配置JAVA_HOME与PATH环境变量。
- 下载Java JDK。
ECS默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本。
- 解压Java JDK。
tar -zxvf jdk-8u321-linux-x64.tar.gz
“jdk-8u321-linux-x64.tar.gz”为JDK的版本,请根据实际情况修改。
- 打开“.bash_profile”文件。
vim ~/.bash_profile
- 添加如下内容。
export JAVA_HOME=/root/jdk1.8.0_321 export PATH=$JAVA_HOME/bin:$PATH
“/root/jdk1.8.0_321”为JDK的安装路径,请根据实际情况修改。
- 按“Esc”,然后输入以下命令,按“Enter”,保存并退出“.bash_profile”文件。
:wq
- 执行如下命令使修改生效。
source .bash_profile
- 查看Java JDK是否安装成功。
java -version
回显信息中包含如下信息,表示Java JDK安装成功。java version "1.8.0_321"
- 下载Java JDK。
- 下载开源的Kafka客户端。
wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
- 解压Kafka客户端文件。
tar -zxf kafka_2.12-2.7.2.tgz
- 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。
步骤二:创建Kafka实例
- 进入购买Kafka实例页面。
- 在“快速购买”页签,设置实例基础配置,如图2所示,配置详情请参考表2。
- 设置实例规格和存储空间,如图3所示,配置详情请参考表3。
- 设置实例网络环境信息,如图4所示,配置详情请参考表4。
- 设置实例的访问方式,如图5所示,配置详情请参考表5。
表5 设置实例的访问方式 参数
子参数
说明
内网访问
接入方式
选择“密文接入”,即客户端连接Kafka实例时,需要进行SASL认证。
kafka安全协议
选择“SASL_SSL”,即采用SASL方式进行认证,数据通过SSL证书进行加密传输,安全性更高。
内网IP地址
选择“自动分配”,即系统自动分配子网中可用的IP地址。
SSL用户名
用户名输入“test”,密文接入成功开启后,用户名不支持修改。
用户名需要符合以下命名规则:由英文字母开头,且只能由英文字母、数字、中划线、下划线组成,长度为4~64个字符。
密码
设置连接实例的密码,密码需要符合以下命名规则:
- 长度为8~32个字符。
- 至少包含以下字符中的3种:大写字母、小写字母、数字、特殊字符`~!@#$%^&*()-_=+\|[{}];:'",<.>? 和空格,并且不能以-开头。
- 不能与用户名或倒序的用户名相同。
SASL PLAIN机制
勾选“开启SASL PLAIN机制”。密文接入成功开启后,SASL PLAIN机制不支持修改。
开启SASL PLAIN机制后,同时支持SCRAM-SHA-512机制和PLAIN机制,根据实际情况选择其中任意一种配置连接。
公网访问
-
选择“不开启”。
- 设置实例高级配置,如图6所示,配置详情请参考表6,其他参数保持默认设置。
- 单击“确认订单”,进入规格确认页面。
- 确认实例信息无误后,提交请求。
- 单击“返回Kafka专享版列表”,查看Kafka实例是否创建成功。
创建实例大约需要3到15分钟,此时实例的“状态”为“创建中”。
- 当实例的“状态”变为“运行中”时,说明实例创建成功。
- 当实例的“状态”变为“创建失败”,请删除创建失败的实例,重新创建。如果重新创建仍然失败,请联系客服。
创建失败的实例,不会占用其他资源。
- 实例创建成功后,单击实例名称,进入实例详情页。
- 在“连接信息”区域,查看并记录连接地址。
图7 使用内网通过同一个VPC访问Kafka实例的连接地址
步骤三:创建Topic
- 在“Kafka专享版”页面,单击Kafka实例的名称,进入实例详情页面。
- 在左侧导航栏单击“Topic管理”,进入Topic列表页。
- 单击“创建Topic”,弹出“创建Topic”对话框。
- 填写Topic名称和配置信息,配置详情请参考表7,单击“确定”,完成创建Topic。
表7 Topic参数说明 参数
说明
Topic名称
名称支持自定义,但需要符合命名规则:以英文字母、数字、下划线开头,且只能由英文字母、数字、句点、中划线、下划线组成,长度为3~200个字符。
名称不能为以下内置Topic:
- __consumer_offsets
- __transaction_state
- __trace
- __connect-status
- __connect-configs
- __connect-offsets
创建Topic后不能修改名称。
输入“topic-01”。
分区数
如果分区数与消费者数一致,分区数越大消费的并发度越大。
输入“3”。
副本数
Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。
输入“3”。
老化时间(小时)
消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。
输入“72”。
同步复制
选择“不开启”,即Leader副本接收到消息并成功写入到本地日志后,就马上向客户端发送写入成功的消息,不需要等待所有Follower副本复制完成。
同步落盘
选择“不开启”,即生产的消息存在内存中,不会立即写入磁盘。
消息时间戳类型
选择“CreateTime”,即生产者创建消息的时间。
批处理消息最大值(字节)
Kafka允许的最大批处理大小,如果在生产客户端配置文件或代码中启用消息压缩,则表示压缩后的最大批处理大小。
输入“10485760”。
描述
不设置。
图8 创建Topic
步骤四:连接实例生产消费消息
- 配置生产消费配置文件。
- 登录Linux系统的ECS。
- 下载client.jks证书并上传到ECS的“/root”目录下。
获取证书的方法如下:在Kafka控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。解压压缩包,获取压缩包中的客户端证书文件:client.jks。
“/root”为证书存放路径,请根据实际情况修改。
- 进入Kafka客户端文件的“/config”目录下。
cd kafka_2.12-2.7.2/config
- 在“consumer.properties”和“producer.properties”文件中分别增加如下行(示例以PLAIN机制为例介绍)。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN security.protocol=SASL_SSL ssl.truststore.location={ssl_truststore_path} ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
参数说明:
- username和password为创建Kafka实例过程中开启密文接入时填入的用户名和密码。
- ssl.truststore.location配置为1.b证书的存放路径。
- ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
- 进入Kafka客户端文件的“/bin”目录下。
cd ../bin
- 生产消息。
./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties
参数说明如下:
示例如下,“192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093”为Kafka实例连接地址。
执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-01 --producer.config ../config/producer.properties >Hello >DMS >Kafka! >^C[root@ecs-kafka bin]#
如需停止生产使用Ctrl+C命令退出。
- 消费消息。
./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --from-beginning --consumer.config ../config/consumer.properties
参数说明如下:
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-01 --from-beginning --consumer.config ../config/consumer.properties Hello Kafka! DMS ^CProcessed a total of 3 messages [root@ecs-kafka bin]#
如需停止消费使用Ctrl+C命令退出。
相关信息
- 了解Kafka实例的相关概念,请参考Kafka基本概念。
- 了解Kafka实例的价格,请参考分布式消息服务Kafka版价格详情。
- 在控制台查看消息,请参考查看Kafka消息。
- 查看消费进度,请参考查看Kafka消费进度。
- 查看Kafka实例的监控指标,请参考查看Kafka监控数据。