更新时间:2024-05-29 GMT+08:00

快速连接Kafka并生产消费消息

本章节将为您介绍分布式消息服务Kafka版入门的使用流程,以创建并连接一个开启SASL的Kafka 2.7实例为例,帮助您快速上手Kafka。

您还可以通过API方式创建Kafka实例、在业务代码中连接Kafka实例。

使用流程

图1 Kafka使用流程
  1. 环境准备

    Kafka实例运行于虚拟私有云(Virtual Private Cloud,以下简称VPC)中,在创建实例前需要确保有可用的虚拟私有云。

    Kafka实例创建后,您需要在弹性云服务器中下载和安装Kafka开源客户端,然后才能进行生产消息和消费消息。

  2. 创建Kafka实例

    在创建实例时,您可以根据需求选择需要的实例规格和数量,并开启SASL访问。开启SASL后,数据加密传输,安全性更高。

  3. (可选)创建Topic

    Kafka实例创建成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic,然后才能进行生产消息和消费消息。

  4. 连接实例

    在连接开启SASL的Kafka实例时,需要下载证书,并在客户端配置文件中设置连接信息。

关于Kafka的相关概念,请参考Kafka基本概念

步骤一:准备环境

  1. 如果需要对云上的资源进行精细管理,请使用IAM服务创建IAM用户及用户组,并授权,以使得IAM用户获得具体的操作权限。具体操作,请参考创建用户并授权使用DMS for Kafka
  2. 在创建Kafka实例前,确保已存在可用的VPC和子网。

    Kafka实例可以使用当前账号下已创建的VPC和子网,也可以使用新创建的VPC和子网,请根据实际需要进行配置。创建VPC和子网的操作指导,请参考创建虚拟私有云和子网。请注意:创建的VPC与Kafka实例必须在相同的区域。

  3. 在创建Kafka实例前,确保已存在可用的安全组。

    Kafka实例可以使用当前账号下已创建的安全组,也可以使用新创建的安全组,请根据实际需要进行配置。创建安全组的操作步骤,请参考创建安全组

    连接Kafka实例前,请添加表1所示安全组规则,其他规则请根据实际需要添加。
    表1 安全组规则

    方向

    协议

    端口

    源地址

    说明

    入方向

    TCP

    9093

    0.0.0.0/0

    使用内网通过同一个VPC访问Kafka实例(开启SSL加密)。

    安全组创建后,系统默认添加入方向“允许安全组内的弹性云服务器彼此通信”规则和出方向“放通全部流量”规则。此时使用内网通过同一个VPC访问Kafka实例,无需添加表1的规则。

  4. 在连接Kafka实例前,需要先创建弹性云服务器(Elastic Cloud Server,以下简称ECS)、安装JDK、配置环境变量以及下载Kafka开源客户端。本文以Linux系统的ECS为例,Windows系统ECS的JDK安装与环境变量配置可自行在互联网查找相关帮助。

    1. 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。

      创建ECS的操作步骤,请参考创建弹性云服务器。如果您已有可用的ECS,可重复使用,不需要再次创建。

    2. 登录ECS。
    3. 安装Java JDK或JRE,并配置JAVA_HOME与PATH环境变量。
      使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。
      export JAVA_HOME=/opt/java/jdk1.8.0_151 
      export PATH=$JAVA_HOME/bin:$PATH

      执行source .bash_profile命令使修改生效。

      ECS默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本

    4. 下载开源的Kafka客户端。
      wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
    5. 解压Kafka客户端文件。
      tar -zxf kafka_2.12-2.7.2.tgz

步骤二:创建Kafka实例

  1. 登录分布式消息服务Kafka控制台,单击页面右上方的“购买Kafka实例”。
  2. “计费模式”选择“按需计费”。
  3. 在“区域”下拉列表中,选择靠近您应用程序的区域,可降低网络延时、提高访问速度。
  4. 在“项目”下拉列表中,选择项目。
  5. 在“可用区”区域,根据实际情况选择1个或者3个及以上可用区。
  6. 设置“实例名称”和“企业项目”。
  7. 设置实例信息,配置详情请参考表2

    表2 设置实例信息

    参数

    配置说明

    版本

    选择“2.7”

    Kafka实例创建后,版本号不支持修改。

    CPU架构

    选择“x86计算”

    代理规格

    选择“kafka.2u4g.cluster”

    代理数量

    选择“3”

    单个代理存储空间

    选择“超高I/O 100GB”

    实例总存储空间 = 单个代理的存储空间 * 代理数量,Kafka实例创建后,磁盘类型不支持修改。

    容量阈值策略

    选择“自动删除”

  8. 设置实例网络环境信息,配置详情请参考表3

    表3 设置实例网络环境信息

    参数

    配置说明

    虚拟私有云

    选择已经创建好的虚拟私有云和子网。

    虚拟私有云和子网在Kafka实例创建完成后,不支持修改。

    安全组

    选择已经创建好的安全组。

  9. 设置实例的访问方式,配置详情请参考表4

    表4 设置实例的访问方式

    参数

    配置说明

    Kafka SASL_SSL

    开启SASL_SSL

    Kafka实例创建后,Kafka SASL_SSL开关不支持修改。

    SASL PLAIN机制

    开启SASL PLAIN机制

    开启“SASL PLAIN机制”后,同时支持SCRAM-SHA-512机制和PLAIN机制,在客户端连接Kafka实例时,选择其中任意一种机制进行连接。

    用户名

    设置客户端用于连接Kafka实例的用户名。

    密码

    设置客户端用于连接Kafka实例的密码。

  10. 设置登录Kafka Manager的用户名和密码。创建实例后,Kafka Manager用户名无法修改。

    Kafka Manager是开源的Kafka集群管理工具,实例创建成功后,实例详情页面会展示Kafka Manager登录地址,您可登录Kafka Manager页面,查看Kafka集群的监控、代理等信息。

  11. 单击“更多配置”,设置更多相关信息,配置详情请参考表5

    表5 更多配置

    参数

    配置说明

    Kafka自动创建Topic

    不开启

    标签

    不设置

    描述

    不设置

  12. 填写完上述信息后,单击“立即购买”,进入规格确认页面。
  13. 确认实例信息无误后,提交请求。
  14. 单击“返回Kafka专享版列表”,查看Kafka实例是否创建成功。

    创建实例大约需要3到15分钟,此时实例的“状态”为“创建中”。

    • 当实例的“状态”变为“运行中”时,说明实例创建成功。
    • 当实例的“状态”变为“创建失败”,请删除创建失败的实例,然后重新创建。如果重新创建仍然失败,请联系客服。

      创建失败的实例,不会占用其他资源。

(可选)步骤三:创建Topic

  1. 在“Kafka专享版”页面,单击Kafka实例的名称,进入实例详情页面。
  2. 在左侧导航栏单击“Topic管理”,进入Topic列表页。
  3. 单击“创建Topic”,弹出“创建Topic”对话框。
  4. 填写Topic名称和配置信息,单击“确定”,完成创建Topic。

    表6 Topic参数说明

    参数

    说明

    Topic名称

    设置为“topic-01”

    创建Topic后不能修改名称。

    分区数

    设置为“3”

    分区数越大消费的并发度越大。

    副本数

    设置为“3”

    Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。

    说明:

    实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。

    老化时间(小时)

    设置为“72”

    消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。

    同步复制

    不开启

    同步落盘

    不开启

    消息时间戳类型

    选择“CreateTime”

    批处理消息最大值

    保持默认值

步骤四:连接实例生产消费消息

  1. 获取实例的连接地址。

    1. 在左侧导航栏单击“基本信息”,进入实例详情页。
    2. 在“连接信息”区域,查看连接地址。
      图2 使用内网通过同一个VPC访问Kafka实例的连接地址

  2. 配置生产消费配置文件。

    1. 登录Linux系统的ECS。
    2. 在ECS的“/etc/hosts”文件中配置host和IP的映射关系,以便客户端能够快速解析实例的Broker。

      其中,IP地址必须为实例连接地址(从1中获取的连接地址),host为每个实例主机的名称(您可以自定义主机的名称,但不能重复)。

      例如:

      10.154.48.120 server01

      10.154.48.121 server02

      10.154.48.122 server03

    3. 下载client.truststore.jks证书。

      在Kafka控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。解压压缩包,获取压缩包中的客户端证书文件:client.truststore.jks。

    4. 在“consumer.properties”和“producer.properties”文件中分别增加如下行(示例以PLAIN机制为例介绍):
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="**********" \
      password="**********";        
      sasl.mechanism=PLAIN
      
      security.protocol=SASL_SSL
      ssl.truststore.location={ssl_truststore_path}
      ssl.truststore.password=dms@kafka
      ssl.endpoint.identification.algorithm=

      参数说明:

      • username和password为创建Kafka实例过程中开启SASL_SSL时填入的用户名和密码。
      • ssl.truststore.location配置为2.c证书的存放路径。
      • ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
      • ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空

  3. 生产消息。

    进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息。

    ./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties

    参数说明如下:

    • 连接地址:从1中获取的连接地址。
    • Topic名称:从4中获取的Topic名称。

    示例如下,“192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093”为Kafka实例连接地址。

    执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。

    [root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093  --topic topic-demo --producer.config ../config/producer.properties
    >Hello
    >DMS
    >Kafka!
    >^C[root@ecs-kafka bin]# 

    如需停止生产使用Ctrl+C命令退出。

  4. 消费消息。

    执行如下命令消费消息。

    ./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --group ${消费组名称} --from-beginning  --consumer.config ../config/consumer.properties

    参数说明如下:

    • 连接地址:从1中获取的连接地址。
    • Topic名称:从4中获取的Topic名称。
    • 消费组名称:根据您的业务需求,设定消费组名称。如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。消费组名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。

    示例如下:

    [root@ecs-kafka bin]#  ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties
    Hello
    Kafka!
    DMS
    ^CProcessed a total of 3 messages
    [root@ecs-kafka bin]# 

    如需停止消费使用Ctrl+C命令退出。