文档首页/ MapReduce服务 MRS/ 快速入门/ 快速创建和使用Kafka流式数据处理集群
更新时间:2024-11-21 GMT+08:00
分享

快速创建和使用Kafka流式数据处理集群

操作场景

本入门提供从零开始创建流式分析集群并在Kafka主题中产生和消费消息的操作指导。

Kafka集群提供一个高吞吐量、可扩展性的消息系统,广泛用于日志收集、监控数据聚合等场景。Kafka可实现高效的流式数据采集、实时数据处理存储等。

操作流程

开始使用如下样例前,请务必按准备工作指导完成必要操作。

  1. 创建MRS集群:创建一个MRS 3.2.0-LTS.1版本的“实时分析集群”。
  2. 安装集群客户端:下载并安装MRS集群客户端。
  3. 使用Kafka客户端创建Topic:在Kafka客户端创建Topic。
  4. 管理Kafka主题中的消息:在Kafka客户端消费已创建的Topic中的消息。
  5. 释放资源:如果您在完成实践后不需要继续使用MRS集群,请及时清理资源以免产生额外扣费。

准备工作

操作视频

使用Kafka客户端创建Topic案例可参考使用Kafka客户端创建Topic,该视频以未开启Kerberos认证的MRS 3.1.0版本集群为例,介绍MRS集群创建成功后,如何在Kafka客户端完成对Topic的创建、查询、删除等操作。

因不同版本操作界面可能存在差异,相关视频供参考,具体以实际环境为准。

步骤一:创建MRS集群

  1. 进入购买MRS集群页面
  2. 在服务列表中搜索“MapReduce服务 MRS”,进入MRS服务管理控制台。
  3. 单击“购买集群”,进入“购买集群”页面,选择“快速购买”页签。
  4. 根据实际业务规划情况填写集群配置信息(本示例为快速创建按需计费的MRS 3.2.0-LTS.1版本集群,如需了解更多参数配置请参考快速创建MRS集群)。

    表1 MRS集群配置参数

    参数

    示例

    参数说明

    计费模式

    按需计费

    选择待创建的集群的计费模式,MRS提供“包年/包月”与“按需计费”两种计费模式。

    按需计费是一种后付费模式,即先使用再付费,按照MRS集群实际使用时长计费。

    区域

    华北-北京四

    选择区域。

    不同区域的云服务产品之间内网互不相通。请就近选择靠近您业务的区域,可减少网络时延,提高访问速度。

    集群名称

    mrs_demo

    待创建的MRS集群名称。

    集群类型

    自定义

    根据业务实际需要选择待创建的MRS集群类型。“自定义”类型集群提供丰富的组件搭配,可自行选择对应版本MRS集群所支持的所有组件。

    版本类型

    LTS版

    待创建的MRS集群版本类型,不同版本所包含的开源组件版本及功能特性可能不同,推荐选择最新版本。

    集群版本

    MRS 3.2.0-LTS.1

    待创建的MRS集群版本。

    组件选择

    实时分析集群

    基于系统预置的集群模板选择要购买的集群组件。

    可用区

    可用区1

    选择集群工作区域下关联的可用区。

    虚拟私有云

    vpc-default

    选择需要创建集群的VPC,单击“查看虚拟私有云”进入VPC服务查看已创建的VPC名称和ID。如果没有VPC,需要创建一个新的VPC。

    子网

    subnet-default

    选择需要创建集群的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。如果VPC下未创建子网,请单击“创建子网”进行创建。

    集群节点

    保持默认

    配置集群节点信息。

    Kerberos认证

    不开启

    是否启用Kerberos认证。

    用户名

    admin/root

    登录集群管理页面及ECS节点的用户名。

    密码

    -

    设置登录集群管理页面及ECS节点用户的密码。

    确认密码

    -

    再次输入设置用户密码。

    企业项目

    default

    选择集群所属的企业项目。

    通信安全授权

    勾选

    勾选确认授权。

    图1 购买实时分析集群

  5. 单击“立即购买”,进入任务提交成功页面。
  6. 单击“返回集群列表”,在“现有集群”列表中可以查看到集群创建的状态。

    集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。

步骤二:安装集群客户端

MRS集群创建成功后,用户可安装集群客户端用于连接集群内各组件服务,进行作业提交等操作。

客户端可以安装在集群内的节点上,也可以安装在集群外的节点上。本指南以在Master1节点上安装客户端为例进行介绍。

  1. MRS集群创建成功后,在集群列表中单击MRS集群名称进入集群概览页面。
  2. 单击“集群管理页面 ”后的“前往 Manager”,在弹出的窗口中选择“EIP访问”并配置弹性IP信息。

    首次访问时,可点击“管理弹性公网IP”,在弹性公网IP控制台购买一个弹性公网IP,购买成功后刷新弹性公网IP列表并选择。

  3. 勾选确认信息后,单击“确定”,登录集群的FusionInsight Manager管理界面。

    Manager登录用户名为admin,密码为购买集群时配置的用于登录Manager管理界面的“admin”用户的自定义密码。

  4. 在“主页”页签的集群名称后单击,单击“下载客户端”下载集群客户端。

    图2 下载客户端

    在“下载集群客户端”弹窗中,配置以下参数:

    • 选择客户端类型:选择“完整客户端”。
    • 选择平台类型:默认勾选即可,例如“x86_64”。
    • 勾选“仅保存到如下路径”,使用默认保存路径,文件生成后将保存在集群主OMS节点的“/tmp/FusionInsight-Client”目录下。
    图3 下载集群客户端提示框

    单击“确定”后,等待客户端软件生成成功。

  5. 在MRS服务管理控制台的集群列表中,单击集群名称,在集群的“节点管理”页签,单击包含“master1”的节点名称,在ECS详情页面单击右上角的“远程登录”登录Master1节点。

    图4 查看Master1节点

  6. 使用root用户登录Master1节点,密码为购买集群时配置的用于登录集群节点的“root”用户自定义密码。
  7. 执行以下命令切换到客户端软件包所在目录,并解压客户端软件包:

    cd /tmp/FusionInsight-Client/

    tar -xvf FusionInsight_Cluster_1_Services_Client.tar

    tar -xvf FusionInsight_Cluster_1_Services_ClientConfig.tar

  8. 执行以下命令进入安装包所在目录,安装客户端:

    cd FusionInsight_Cluster_1_Services_ClientConfig

    执行如下命令安装客户端到指定目录(绝对路径),例如安装到“/opt/client”目录,等待客户端安装完成。

    ./install.sh /opt/client

    ...
    ... component client is installed successfully
    ...

    客户端安装目录可以不存在,会自动创建。但如果存在,则必须为空,目录路径不能包含空格。且客户端安装目录路径只能包含大写字母、小写字母、数字以及_字符。

步骤三:使用Kafka客户端创建Topic

  1. 在集群列表中单击MRS集群名称进入集群概览页面。
  2. 在“概览”页面单击“IAM用户同步”,勾选“全部同步”后单击“同步”,等待IAM用户同步完成。
  3. 选择“组件管理 > ZooKeeper > 实例”,查看ZooKeeper quorumpeer角色实例的IP地址,记录任意一个IP地址。

    图5 查看ZooKeeper角色实例IP

  4. 单击“服务配置”,查看ZooKeeper客户端连接端口“clientPort”参数值。
  5. 单击“服务 ZooKeeper”返回组件列表。

    图6 返回组件列表

  6. 选择“Kafka > 实例”,查看Kafka Broker角色实例的IP地址,记录任意一个IP地址。

    图7 查看Broker角色实例IP

  7. 单击“服务配置”,查看Kafka Broker连接端口“port”参数值。
  8. root用户登录MRS客户端所在节点(Master1节点)。
  9. 执行以下命令切换到客户端安装目录并配置环境变量。

    cd /opt/client

    source bigdata_env

  10. 创建Kafka Topic。

    kafka-topics.sh --create --zookeeper ZooKeeper角色实例IP地址:ZooKeeper客户端连接端口/kafka --partitions 2 --replication-factor 2 --topic Topic名称

    例如执行以下命令:

    kafka-topics.sh --create --zookeeper 192.168.21.234:2181/kafka --partitions 2 --replication-factor 2 --topic Topic1

    执行结果输出以下信息表示创建Topic成功:

    Created topic Topic1.

步骤四:管理Kafka主题中的消息

  1. root用户登录MRS客户端所在节点(Master1节点)。
  2. 执行以下命令切换到客户端安装目录并配置环境变量。

    cd /opt/client

    source bigdata_env

  3. 在Topic1中产生消息。

    kafka-console-producer.sh --broker-list Kafka Broker角色实例所在节点IP:Broker连接端口 --topic Topic名称 --producer.config /opt/hadoopclient/Kafka/kafka/config/producer.properties

    Kafka Broker角色实例所在节点IP地址及端口信息可参考步骤三:使用Kafka客户端创建Topic67获取。

    例如执行以下命令:

    kafka-console-producer.sh --broker-list 192.168.21.21:9092 --topic Topic1 --producer.config /opt/client/Kafka/kafka/config/producer.properties

  4. 重新打开一个客户端连接窗口。

    cd /opt/client

    source bigdata_env

  5. 执行命令消费Topic1中的消息。

    kafka-console-consumer.sh --topic Topic名称 --bootstrap-server Kafka Broker角色实例所在节点IP:Broker连接端口 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

    例如执行以下命令:

    kafka-console-consumer.sh --topic Topic1 --bootstrap-server 192.168.21.21:9092 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

  6. 3产生消息的命令行中输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。

    例如:

    >aaa
    >bbb
    >ccc

    如果需要结束产生消息,使用“Ctrl + C”退出任务。

  7. 5的消费消息窗口中,可以观察到消息被成功消费。

    aaa
    bbb
    ccc

后续操作:释放资源

如果您无需继续使用该MRS集群,请及时释放资源,避免产生额外的费用。详细操作步骤请参见删除MRS集群

相关信息

更多Kafka权限管理、Topic管理及消费、高可用配置及数据数据均衡等操作请参见使用Kafka

相关文档