文档首页/ 分布式消息服务Kafka版/ 快速入门/ 快速连接Kafka并生产消费消息
更新时间:2024-10-22 GMT+08:00
分享

快速连接Kafka并生产消费消息

本章节将为您介绍分布式消息服务Kafka版入门的使用流程,以创建一个开启密文接入、安全协议为SASL_SSL的Kafka实例,客户端使用内网通过同一个VPC连接Kafka实例生产消费消息为例,帮助您快速上手Kafka。

图1 Kafka使用流程
  1. 准备工作

    Kafka实例运行于虚拟私有云(Virtual Private Cloud,简称VPC)中,在创建实例前需要确保有可用的虚拟私有云。

    Kafka实例创建后,您需要在弹性云服务器中下载和安装Kafka开源客户端,然后才能进行生产消息和消费消息。

  2. 创建Kafka实例

    在创建实例时,您可以根据需求选择需要的实例规格和数量,并开启密文接入,安全协议设置为“SASL_SSL”。

    连接安全协议为“SASL_SSL”的Kafka实例时,使用SASL认证,数据通过SSL证书进行加密传输,安全性更高。

  3. 创建Topic

    Topic用于存储消息,供生产者生产消息以及消费者订阅消息。

    本文以在控制台创建Topic为例介绍。

  4. 连接实例

    在连接安全协议为“SASL_SSL”的Kafka实例时,需要下载证书,并在客户端配置文件中设置连接信息。

步骤一:准备工作

  1. 注册华为账号并实名认证。

    在创建Kafka实例前,请先注册华为账号并实名认证,具体步骤请参考注册华为账号并开通华为云实名认证介绍

    如果您已有一个华为账号并实名认证,请跳过此步骤。

  2. 为账户充值。

    在创建Kafka实例前,确保账户有足够金额。账户充值的具体步骤,请参考账户充值

  3. 为用户添加Kafka实例的操作权限。

    如果您需要对云上的资源进行精细管理,请使用统一身份认证服务(Identity and Access Management,简称IAM)创建IAM用户及用户组,并授权,以使得IAM用户获得Kafka实例的操作权限。具体操作请参考创建用户并授权使用DMS for Kafka

  4. 创建VPC和子网。

    在创建Kafka实例前,确保已存在可用的VPC和子网。创建VPC和子网的具体步骤,请参考创建虚拟私有云和子网

    创建的VPC与Kafka实例必须在相同的区域。

  5. 创建安全组并添加安全组规则。

    在创建Kafka实例前,确保已存在可用的安全组。创建安全组的具体步骤,请参考创建安全组

    连接Kafka实例前,请添加表1所示安全组规则,其他规则请根据实际需要添加。
    表1 安全组规则

    方向

    协议

    端口

    源地址

    说明

    入方向

    TCP

    9093

    0.0.0.0/0

    使用内网通过同一个VPC访问Kafka实例(密文接入)。

    安全组创建后,系统默认添加入方向“允许安全组内的弹性云服务器彼此通信”规则和出方向“放通全部流量”规则。此时使用内网通过同一个VPC访问Kafka实例,无需添加表1的规则。

  6. 构建生产消费客户端。

    本文以Linux系统的弹性云服务器(Elastic Cloud Server,简称ECS)作为生产消费客户端。在创建Kafka实例前,请先创建开启弹性公网IP的ECS、安装JDK、配置环境变量以及下载Kafka开源客户端。
    1. 登录管理控制台,在左上角单击,选择“计算 > 弹性云服务器”,创建一个ECS实例。

      创建ECS的具体步骤,请参考购买弹性云服务器。如果您已有可用的ECS,可重复使用,不需要再次创建。

    2. 使用root用户登录ECS。
    3. 安装Java JDK,并配置JAVA_HOME与PATH环境变量。
      1. 下载Java JDK。

        ECS默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本

      2. 解压Java JDK。
        tar -zxvf jdk-8u321-linux-x64.tar.gz

        “jdk-8u321-linux-x64.tar.gz”为JDK的版本,请根据实际情况修改。

      3. 打开“.bash_profile”文件。
        vim ~/.bash_profile
      4. 添加如下内容。
        export JAVA_HOME=/root/jdk1.8.0_321 
        export PATH=$JAVA_HOME/bin:$PATH

        “/root/jdk1.8.0_321”为JDK的安装路径,请根据实际情况修改。

      5. 按“Esc”,然后输入以下命令,按“Enter”,保存并退出“.bash_profile”文件。
        :wq
      6. 执行如下命令使修改生效。
        source .bash_profile
      7. 查看Java JDK是否安装成功。
        java -version
        回显信息中包含如下信息,表示Java JDK安装成功。
        java version "1.8.0_321"
    4. 下载开源的Kafka客户端。
      wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
    5. 解压Kafka客户端文件。
      tar -zxf kafka_2.12-2.7.2.tgz

步骤二:创建Kafka实例

  1. 进入购买Kafka实例页面
  2. 在“快速购买”页签,设置实例基础配置,如图2所示,配置详情请参考表2

    表2 设置实例基础配置

    参数

    说明

    计费模式

    选择“按需计费”,即先使用再付费,按照Kafka实例实际使用时长计费,秒级计费,按小时结算。

    区域

    不同区域的云服务产品之间内网互不相通。请就近选择靠近您业务的区域,可减少网络时延,提高访问速度。

    选择“华北-北京四”。

    可用区

    可用区指在同一区域下,电力、网络隔离的物理区域,可用区之间内网互通,不同可用区之间物理隔离。

    选择“可用区1、可用区2、可用区3”。

    图2 设置实例基础配置

  3. 设置实例规格和存储空间,如图3所示,配置详情请参考表3

    表3 设置实例规格和存储空间

    参数

    说明

    套餐规格

    选择“综合推荐”中的“入门版”。入门版面向入门级用户,提供兼容开源协议的托管Kafka服务,适用于性能诉求不高、时延容忍度较高的成本敏感业务或测试环境等。

    单个代理存储空间

    根据实际需要选择存储Kafka数据的磁盘类型和磁盘大小。

    实例总存储空间 = 单个代理的存储空间 * 代理数量,Kafka实例创建后,磁盘类型不支持修改。

    磁盘类型选择“超高I/O”,磁盘大小设置为“100GB”。

    图3 设置实例规格和存储空间

  4. 设置实例网络环境信息,如图4所示,配置详情请参考表4

    表4 设置实例网络环境信息

    参数

    说明

    虚拟私有云

    虚拟私有云在Kafka实例创建完成后,不支持修改。

    选择准备工作中设置好的虚拟私有云。

    子网

    子网在Kafka实例创建完成后,不支持修改。

    选择准备工作中设置好的子网。

    安全组

    选择准备工作中设置好的安全组。

    图4 设置实例网络环境信息

  5. 设置实例的访问方式,如图5所示,配置详情请参考表5

    表5 设置实例的访问方式

    参数

    子参数

    说明

    内网访问

    接入方式

    选择“密文接入”,即客户端连接Kafka实例时,需要进行SASL认证。

    kafka安全协议

    选择“SASL_SSL”,即采用SASL方式进行认证,数据通过SSL证书进行加密传输,安全性更高。

    内网IP地址

    选择“自动分配”,即系统自动分配子网中可用的IP地址。

    SSL用户名

    用户名输入“test”,密文接入成功开启后,用户名不支持修改。

    用户名需要符合以下命名规则:由英文字母开头,且只能由英文字母、数字、中划线、下划线组成,长度为4~64个字符。

    密码

    设置连接实例的密码,密码需要符合以下命名规则:

    • 长度为8~32个字符。
    • 至少包含以下字符中的3种:大写字母、小写字母、数字、特殊字符`~!@#$%^&*()-_=+\|[{}];:'",<.>? 和空格,并且不能以-开头。
    • 不能与用户名或倒序的用户名相同。

    SASL PLAIN机制

    勾选“开启SASL PLAIN机制”。密文接入成功开启后,SASL PLAIN机制不支持修改。

    开启SASL PLAIN机制后,同时支持SCRAM-SHA-512机制和PLAIN机制,根据实际情况选择其中任意一种配置连接。

    公网访问

    -

    选择“不开启”。

    图5 设置实例的访问方式

  6. 设置实例高级配置,如图6所示,配置详情请参考表6,其他参数保持默认设置。

    表6 设置实例高级配置

    参数

    说明

    实例名称

    实例名称支持自定义,但需要符合命名规则:长度为4~64个字符,由英文字母开头,只能由英文字母、数字、中划线、下划线组成。

    输入“kafka-test”。

    企业项目

    该参数针对企业用户使用。企业项目是对企业不同项目间资源的分组和管理,属于逻辑隔离。

    选择“default”。

    容量阈值策略

    选择“自动删除”,即磁盘达到95%的容量阈值后,可以正常生产和消费消息,但是会删除最早的10%的消息,以保证磁盘容量充足。该场景优先保障业务不中断,数据存在丢失的风险。

    图6 设置实例高级配置

  7. 单击“确认订单”,进入规格确认页面。
  8. 确认实例信息无误后,提交请求。
  9. 单击“返回Kafka专享版列表”,查看Kafka实例是否创建成功。

    创建实例大约需要3到15分钟,此时实例的“状态”为“创建中”。

    • 当实例的“状态”变为“运行中”时,说明实例创建成功。
    • 当实例的“状态”变为“创建失败”,请删除创建失败的实例,重新创建。如果重新创建仍然失败,请联系客服。

      创建失败的实例,不会占用其他资源。

  10. 实例创建成功后,单击实例名称,进入实例详情页。
  11. 在“连接信息”区域,查看并记录连接地址。

    图7 使用内网通过同一个VPC访问Kafka实例的连接地址

步骤三:创建Topic

  1. 在“Kafka专享版”页面,单击Kafka实例的名称,进入实例详情页面。
  2. 在左侧导航栏单击“Topic管理”,进入Topic列表页。
  3. 单击“创建Topic”,弹出“创建Topic”对话框。
  4. 填写Topic名称和配置信息,配置详情请参考表7,单击“确定”,完成创建Topic。

    表7 Topic参数说明

    参数

    说明

    Topic名称

    名称支持自定义,但需要符合命名规则:以英文字母、数字、下划线开头,且只能由英文字母、数字、句点、中划线、下划线组成,长度为3~200个字符。

    名称不能为以下内置Topic:

    • __consumer_offsets
    • __transaction_state
    • __trace
    • __connect-status
    • __connect-configs
    • __connect-offsets

    创建Topic后不能修改名称。

    输入“topic-01”。

    分区数

    如果分区数与消费者数一致,分区数越大消费的并发度越大。

    输入“3”。

    副本数

    Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。

    输入“3”。

    老化时间(小时)

    消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。

    输入“72”。

    同步复制

    选择“不开启”,即Leader副本接收到消息并成功写入到本地日志后,就马上向客户端发送写入成功的消息,不需要等待所有Follower副本复制完成。

    同步落盘

    选择“不开启”,即生产的消息存在内存中,不会立即写入磁盘。

    消息时间戳类型

    选择“CreateTime”,即生产者创建消息的时间。

    批处理消息最大值(字节)

    Kafka允许的最大批处理大小,如果在生产客户端配置文件或代码中启用消息压缩,则表示压缩后的最大批处理大小。

    输入“10485760”。

    描述

    不设置。

    图8 创建Topic

步骤四:连接实例生产消费消息

  1. 配置生产消费配置文件。

    1. 登录Linux系统的ECS。
    2. 下载client.jks证书并上传到ECS的“/root”目录下。

      获取证书的方法如下:在Kafka控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。解压压缩包,获取压缩包中的客户端证书文件:client.jks。

      “/root”为证书存放路径,请根据实际情况修改。

    3. 进入Kafka客户端文件的“/config”目录下。
      cd kafka_2.12-2.7.2/config
    4. 在“consumer.properties”和“producer.properties”文件中分别增加如下行(示例以PLAIN机制为例介绍)。
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="**********" \
      password="**********";        
      sasl.mechanism=PLAIN
      
      security.protocol=SASL_SSL
      ssl.truststore.location={ssl_truststore_path}
      ssl.truststore.password=dms@kafka
      ssl.endpoint.identification.algorithm=

      参数说明:

      • username和password为创建Kafka实例过程中开启密文接入时填入的用户名和密码。
      • ssl.truststore.location配置为1.b证书的存放路径。
      • ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
      • ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空

  2. 进入Kafka客户端文件的“/bin”目录下。

    cd ../bin

  3. 生产消息。

    ./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties

    参数说明如下:

    示例如下,“192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093”为Kafka实例连接地址。

    执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。

    [root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093  --topic topic-01 --producer.config ../config/producer.properties
    >Hello
    >DMS
    >Kafka!
    >^C[root@ecs-kafka bin]# 

    如需停止生产使用Ctrl+C命令退出。

  4. 消费消息。

    ./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --from-beginning  --consumer.config ../config/consumer.properties

    参数说明如下:

    示例如下:

    [root@ecs-kafka bin]#  ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-01 --from-beginning --consumer.config ../config/consumer.properties
    Hello
    Kafka!
    DMS
    ^CProcessed a total of 3 messages
    [root@ecs-kafka bin]# 

    如需停止消费使用Ctrl+C命令退出。

相关信息

相关文档