文档首页> 分布式消息服务Kafka版> 快速入门> 快速连接Kafka并生产消费消息
更新时间:2024-07-05 GMT+08:00
分享

快速连接Kafka并生产消费消息

本章节将为您介绍分布式消息服务Kafka版入门的使用流程,以在“华东-上海二”区域创建一个开启密文接入、安全协议为SASL_SSL的Kafka 2.7实例,客户端使用内网通过同一个VPC连接Kafka实例生产消费消息为例,帮助您快速上手Kafka。

图1 Kafka使用流程
  1. 准备工作

    Kafka实例运行于虚拟私有云(Virtual Private Cloud,简称VPC)中,在创建实例前需要确保有可用的虚拟私有云。

    Kafka实例创建后,您需要在弹性云服务器中下载和安装Kafka开源客户端,然后才能进行生产消息和消费消息。

  2. 创建Kafka实例

    在创建实例时,您可以根据需求选择需要的实例规格和数量,并开启密文接入,安全协议设置为“SASL_SSL”。

    连接安全协议为“SASL_SSL”的Kafka实例时,使用SASL认证,数据通过SSL证书进行加密传输,安全性更高。

  3. 创建Topic

    Topic用于存储消息,供生产者生产消息以及消费者订阅消息。

    本文以在控制台创建Topic为例介绍。

  4. 连接实例

    在连接安全协议为“SASL_SSL”的Kafka实例时,需要下载证书,并在客户端配置文件中设置连接信息。

步骤一:准备工作

  1. 注册华为账号并实名认证。

    在创建Kafka实例前,请先注册华为账号并实名认证,具体步骤请参考注册华为账号并开通华为云实名认证介绍

    如果您已有一个华为账号并实名认证,请跳过此步骤。

  2. 为账户充值。

    在创建Kafka实例前,确保账户有足够金额。账户充值的具体步骤,请参考账户充值

  3. 为用户添加Kafka实例的操作权限。

    如果您需要对云上的资源进行精细管理,请使用统一身份认证服务(Identity and Access Management,简称IAM)创建IAM用户及用户组,并授权,以使得IAM用户获得Kafka实例的操作权限。具体操作请参考创建用户并授权使用DMS for Kafka

  4. 创建VPC和子网。

    在创建Kafka实例前,确保已存在可用的VPC和子网。创建VPC和子网的具体步骤,请参考创建虚拟私有云和子网

    创建的VPC与Kafka实例必须在相同的区域。

  5. 创建安全组并添加安全组规则。

    在创建Kafka实例前,确保已存在可用的安全组。创建安全组的具体步骤,请参考创建安全组

    连接Kafka实例前,请添加表1所示安全组规则,其他规则请根据实际需要添加。
    表1 安全组规则

    方向

    协议

    端口

    源地址

    说明

    入方向

    TCP

    9093

    0.0.0.0/0

    使用内网通过同一个VPC访问Kafka实例(密文接入)。

    安全组创建后,系统默认添加入方向“允许安全组内的弹性云服务器彼此通信”规则和出方向“放通全部流量”规则。此时使用内网通过同一个VPC访问Kafka实例,无需添加表1的规则。

  6. 构建生产消费客户端。

    本文以Linux系统的弹性云服务器(Elastic Cloud Server,简称ECS)作为生产消费客户端。在创建Kafka实例前,请先创建开启弹性公网IP的ECS、安装JDK、配置环境变量以及下载Kafka开源客户端。
    1. 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。

      创建ECS的具体步骤,请参考购买弹性云服务器。如果您已有可用的ECS,可重复使用,不需要再次创建。

    2. 使用root用户登录ECS。
    3. 安装Java JDK,并配置JAVA_HOME与PATH环境变量。
      1. 下载Java JDK。

        ECS默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本

      2. 解压Java JDK。
        tar -zxvf jdk-8u321-linux-x64.tar.gz

        “jdk-8u321-linux-x64.tar.gz”为JDK的版本,请根据实际情况修改。

      3. 打开“.bash_profile”文件。
        vim ~/.bash_profile
      4. 添加如下内容。
        export JAVA_HOME=/opt/java/jdk1.8.0_321 
        export PATH=$JAVA_HOME/bin:$PATH

        “/opt/java/jdk1.8.0_321”为JDK的安装路径,请根据实际情况修改。

      5. 按“Esc”,然后输入以下命令,按“Enter”,保存并退出“.bash_profile”文件。
        :wq
      6. 执行如下命令使修改生效。
        source .bash_profile
      7. 查看Java JDK是否安装成功。
        java -version
        回显信息中包含如下信息,表示Java JDK安装成功。
        java version "1.8.0_321"
    4. 下载开源的Kafka客户端。
      wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
    5. 解压Kafka客户端文件。
      tar -zxf kafka_2.12-2.7.2.tgz

步骤二:创建Kafka实例

  1. 进入购买Kafka实例页面
  2. 设置实例基础信息,配置详情请参考表2

    表2 设置实例基础信息

    参数

    说明

    计费模式

    选择“按需计费”,即先使用再付费,按照Kafka实例实际使用时长计费,秒级计费,按小时结算。

    区域

    不同区域的云服务产品之间内网互不相通。请就近选择靠近您业务的区域,可减少网络时延,提高访问速度。

    选择“华东-上海二”。

    项目

    每个区域默认对应一个项目,这个项目由系统预置,用来隔离物理区域间的资源(计算资源、存储资源和网络资源)。

    选择“华东-上海二(默认)”。

    可用区

    可用区指在同一区域下,电力、网络隔离的物理区域,可用区之间内网互通,不同可用区之间物理隔离。

    选择“可用区1、可用区2、可用区3”。

    实例名称

    实例名称支持自定义,但需要符合命名规则:长度为4~64个字符,由英文字母开头,只能由英文字母、数字、中划线、下划线组成。

    输入“kafka-test”。

    企业项目

    该参数针对企业用户使用。企业项目是对企业不同项目间资源的分组和管理,属于逻辑隔离。

    选择“default”。

    规格选择模式

    选择“集群版”,创建一个集群Kafka实例。

    版本

    Kafka的版本号。Kafka实例创建后,版本号不支持修改。

    选择“2.7”。

    代理规格

    根据业务需求选择相应的代理规格。

    选择“kafka.2u4g.cluster”。

    代理数量

    根据业务需求选择相应的代理数量。

    选择“3”。

    单个代理存储空间

    根据实际需要选择存储Kafka数据的磁盘类型和磁盘大小。

    实例总存储空间 = 单个代理的存储空间 * 代理数量,Kafka实例创建后,磁盘类型不支持修改。

    选择“超高I/O 100GB”。

    容量阈值策略

    选择“自动删除”,即磁盘达到95%的容量阈值后,可以正常生产和消费消息,但是会删除最早的10%的消息,以保证磁盘容量充足。该场景优先保障业务不中断,数据存在丢失的风险。

    图2 实例基础信息

  3. 设置实例网络环境信息,配置详情请参考表3

    表3 设置实例网络环境信息

    参数

    说明

    虚拟私有云

    虚拟私有云和子网在Kafka实例创建完成后,不支持修改。

    选择准备工作中设置好的虚拟私有云和子网。

    安全组

    选择准备工作中设置好的安全组。

    图3 实例网络环境信息

  4. 设置实例的访问方式,配置详情请参考表4

    表4 设置实例的访问方式

    参数

    子参数

    说明

    内网访问

    明文接入

    选择“不开启”。

    密文接入

    密文接入表示客户端连接Kafka实例时,需要进行SASL认证。

    1. 开启“密文接入”。
    2. “kafka安全协议”选择“SASL_SSL”,设置用户名和密码。用户名输入“test”。密文接入成功开启后,用户名不支持修改。

      用户名需要符合以下命名规则:由英文字母开头,且只能由英文字母、数字、中划线、下划线组成,长度为4~64个字符。

      密码需要符合以下命名规则:

      • 长度为8~32个字符。
      • 至少包含以下字符中的3种:大写字母、小写字母、数字、特殊字符`~!@#$%^&*()-_=+\|[{}];:'",<.>? 和空格,并且不能以-开头。
      • 不能与用户名或倒序的用户名相同。
    3. 开启“SASL PLAIN机制”。

    内网IP地址

    选择“自动分配”,即系统自动分配子网中可用的IP地址。

    公网访问

    -

    选择“不开启”。

    图4 实例的访问方式

  5. 单击“更多配置”,设置更多相关信息,配置详情请参考表5

    表5 更多配置

    参数

    说明

    Smart Connect

    选择“不开启”。

    Kafka自动创建Topic

    选择“不开启”。

    标签

    不设置。

    描述

    不设置。

    图5 更多配置

  6. 单击“立即购买”,进入规格确认页面。
  7. 确认实例信息无误后,提交请求。
  8. 单击“返回Kafka专享版列表”,查看Kafka实例是否创建成功。

    创建实例大约需要3到15分钟,此时实例的“状态”为“创建中”。

    • 当实例的“状态”变为“运行中”时,说明实例创建成功。
    • 当实例的“状态”变为“创建失败”,请删除创建失败的实例,然后重新创建。如果重新创建仍然失败,请联系客服。

      创建失败的实例,不会占用其他资源。

  9. 实例创建成功后,单击实例名称,进入实例详情页。
  10. 在“连接信息”区域,查看并记录连接地址。

    图6 使用内网通过同一个VPC访问Kafka实例的连接地址

步骤三:创建Topic

  1. 在“Kafka专享版”页面,单击Kafka实例的名称,进入实例详情页面。
  2. 在左侧导航栏单击“Topic管理”,进入Topic列表页。
  3. 单击“创建Topic”,弹出“创建Topic”对话框。
  4. 填写Topic名称和配置信息,配置详情请参考表6,单击“确定”,完成创建Topic。

    表6 Topic参数说明

    参数

    说明

    Topic名称

    名称支持自定义,但需要符合命名规则:以英文字母、数字、下划线开头,且只能由英文字母、数字、句点、中划线、下划线组成,长度为3~200个字符。

    名称不能为以下内置Topic:

    • __consumer_offsets
    • __transaction_state
    • __trace
    • __connect-status
    • __connect-configs
    • __connect-offsets

    创建Topic后不能修改名称。

    输入“topic-01”。

    分区数

    如果分区数与消费者数一致,分区数越大消费的并发度越大。

    输入“3”。

    副本数

    Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。

    输入“3”。

    老化时间(小时)

    消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。

    输入“72”。

    同步复制

    选择“不开启”,即Leader副本接收到消息并成功写入到本地日志后,就马上向客户端发送写入成功的消息,不需要等待所有Follower副本复制完成。

    同步落盘

    选择“不开启”,即生产的消息存在内存中,不会立即写入磁盘。

    消息时间戳类型

    选择“CreateTime”,即生产者创建消息的时间。

    批处理消息最大值(字节)

    Kafka允许的最大批处理大小,如果在生产客户端配置文件或代码中启用消息压缩,则表示压缩后的最大批处理大小。

    输入“10485760”。

    描述

    不设置。

    图7 创建Topic

步骤四:连接实例生产消费消息

  1. 配置生产消费配置文件。

    1. 登录Linux系统的ECS。
    2. 下载client.jks证书并上传到ECS的“/root”目录下。

      获取证书的方法如下:在Kafka控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。解压压缩包,获取压缩包中的客户端证书文件:client.jks。

      “/root”为证书存放路径,请根据实际情况修改。

    3. 进入Kafka客户端文件的“/config”目录下。
      cd kafka_2.12-2.7.2/config
    4. 在“consumer.properties”和“producer.properties”文件中分别增加如下行(示例以PLAIN机制为例介绍)。
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="**********" \
      password="**********";        
      sasl.mechanism=PLAIN
      
      security.protocol=SASL_SSL
      ssl.truststore.location={ssl_truststore_path}
      ssl.truststore.password=dms@kafka
      ssl.endpoint.identification.algorithm=

      参数说明:

      • username和password为创建Kafka实例过程中开启密文接入时填入的用户名和密码。
      • ssl.truststore.location配置为1.b证书的存放路径。
      • ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
      • ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空

  2. 进入Kafka客户端文件的“/bin”目录下。

    cd ../bin

  3. 生产消息。

    ./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties

    参数说明如下:

    示例如下,“192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093”为Kafka实例连接地址。

    执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。

    [root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093  --topic topic-01 --producer.config ../config/producer.properties
    >Hello
    >DMS
    >Kafka!
    >^C[root@ecs-kafka bin]# 

    如需停止生产使用Ctrl+C命令退出。

  4. 消费消息。

    ./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --from-beginning  --consumer.config ../config/consumer.properties

    参数说明如下:

    示例如下:

    [root@ecs-kafka bin]#  ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-01 --from-beginning --consumer.config ../config/consumer.properties
    Hello
    Kafka!
    DMS
    ^CProcessed a total of 3 messages
    [root@ecs-kafka bin]# 

    如需停止消费使用Ctrl+C命令退出。

相关信息

分享:

    相关文档

    相关产品