文档首页> MapReduce服务 MRS> 快速入门> Kafka流式数据处理集群快速入门
更新时间:2024-06-29 GMT+08:00
分享

Kafka流式数据处理集群快速入门

MapReduce服务(MapReduce Service)提供租户完全可控的企业级大数据集群云服务,轻松运行Hadoop、Spark、HBase、Kafka等大数据组件 。

本入门提供从零开始创建流式分析集群并在Kafka主题中产生和消费消息的操作指导。

Kafka集群提供一个高吞吐量、可扩展性的消息系统。广泛用于日志收集、监控数据聚合等场景,实现高效的流式数据采集、实时数据处理存储等。

视频介绍

使用Kafka客户端创建Topic案例可参考使用Kafka客户端创建Topic,该视频以未开启Kerberos认证的MRS 3.1.0版本集群为例,介绍MRS集群创建成功后,如何在Kafka客户端完成对Topic的创建、查询、删除等操作。

因不同版本操作界面可能存在差异,相关视频供参考,具体以实际环境为准。

购买集群

  1. 登录华为云控制台。
  2. 在服务列表中搜索“MapReduce服务 MRS”,进入MRS服务管理控制台。
  3. 单击“购买集群”,进入“购买集群”页面,选择“快速购买”页签。
  4. 根据实际业务规划情况填写集群配置信息(以下参数仅供参考,可根据实际情况调整)。

    表1 MRS集群配置参数

    参数名称

    参数说明

    取值样例

    区域

    选择区域。

    不同区域的云服务产品之间内网互不相通。请就近选择靠近您业务的区域,可减少网络时延,提高访问速度。

    华北-北京四

    计费模式

    选择待创建的MRS集群的计费模式。

    按需计费

    集群名称

    待创建的MRS集群名称。

    mrs_demo

    集群类型

    待创建的MRS集群类型。

    自定义

    版本类型

    待创建的MRS集群版本类型。

    LTS版

    集群版本

    待创建的MRS集群版本。

    MRS 3.2.0-LTS.1

    组件选择

    选择待创建的MRS集群配套的组件。

    实时分析集群

    可用区

    选择集群工作区域下关联的可用区。

    可用区1

    企业项目

    选择集群所属的企业项目。

    default

    虚拟私有云

    选择需要创建集群的VPC,单击“查看虚拟私有云”进入VPC服务查看已创建的VPC名称和ID。如果没有VPC,需要创建一个新的VPC。

    vpc-default

    子网

    选择需要创建集群的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。如果VPC下未创建子网,请单击“创建子网”进行创建。

    subnet-default

    集群节点

    配置集群节点信息。

    保持默认

    Kerberos认证

    是否启用Kerberos认证。

    不开启

    用户名

    登录集群管理页面及ECS节点的用户名。

    admin/root

    密码

    设置登录集群管理页面及ECS节点用户的密码。

    -

    确认密码

    再次输入设置用户密码。

    -

    通信安全授权

    勾选确认授权。

    -

    图1 购买实时分析集群

  5. 单击“立即购买”,进入任务提交成功页面。
  6. 单击“返回集群列表”,在“现有集群”列表中可以查看到集群创建的状态。

    集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。

安装MRS集群客户端

MRS集群创建成功后,用户可安装集群客户端用于连接集群内各组件服务,进行作业提交等操作。

客户端可以安装在集群内的节点上,也可以安装在集群外的节点上。本指南以在Master1节点上安装客户端为例进行介绍。

  1. MRS集群创建成功后,在集群列表中单击MRS集群名称进入集群概览页面。
  2. 单击“集群管理页面 ”后的“前往 Manager”,在弹出的窗口中选择“EIP访问”并配置弹性IP信息。

    首次访问时,可点击“管理弹性公网IP”,在弹性公网IP控制台购买一个弹性公网IP,购买成功后刷新弹性公网IP列表并选择。

  3. 勾选确认信息后,单击“确定”,登录集群的FusionInsight Manager管理界面。

    Manager登录用户名为admin,密码为购买集群时配置的自定义密码。

  4. 在“主页”页签的集群名称后单击,单击“下载客户端”下载集群客户端。

    图2 下载客户端

    在“下载集群客户端”弹窗中,配置以下参数:

    • 选择客户端类型:选择“完整客户端”。
    • 选择平台类型:默认勾选即可,例如“x86_64”。
    • 勾选“仅保存到如下路径”,使用默认保存路径,文件生成后将保存在集群主OMS节点的“/tmp/FusionInsight-Client”目录下。
    图3 下载集群客户端提示框

    单击“确定”后,等待客户端软件生成成功。

  5. 在MRS服务管理控制台的集群列表中,单击集群名称,在集群的“节点管理”页签,单击包含“master1”的节点名称,在ECS详情页面单击右上角的“远程登录”登录Master1节点。

    图4 查看Master1节点

  6. 使用root用户登录Master1节点,密码为购买集群时配置的自定义密码。
  7. 执行以下命令切换到客户端软件包所在目录,并解压客户端软件包:

    cd /tmp/FusionInsight-Client/

    tar -xvf FusionInsight_Cluster_1_Services_Client.tar

    tar -xvf FusionInsight_Cluster_1_Services_ClientConfig.tar

  8. 执行以下命令进入安装包所在目录,安装客户端:

    cd FusionInsight_Cluster_1_Services_ClientConfig

    执行如下命令安装客户端到指定目录(绝对路径),例如安装到“/opt/client”目录,等待客户端安装完成。

    ./install.sh /opt/client

    ...
    ... component client is installed successfully
    ...

    客户端安装目录可以不存在,会自动创建。但如果存在,则必须为空,目录路径不能包含空格。且客户端安装目录路径只能包含大写字母、小写字母、数字以及_字符。

使用Kafka客户端创建Topic

  1. 在集群列表中单击MRS集群名称进入集群概览页面。
  2. 在“概览”页面单击“IAM用户同步”,勾选“全部同步”后单击“同步”,等待IAM用户同步完成。
  3. 选择“组件管理 > ZooKeeper > 实例”,查看ZooKeeper quorumpeer角色实例的IP地址,记录任意一个IP地址。

    图5 查看ZooKeeper角色实例IP

  4. 单击“服务配置”,查看ZooKeeper客户端连接端口“clientPort”参数值。
  5. 单击“服务 ZooKeeper”返回组件列表。

    图6 返回组件列表

  6. 选择“Kafka > 实例”,查看Kafka Broker角色实例的IP地址,记录任意一个IP地址。

    图7 查看Broker角色实例IP

  7. 单击“服务配置”,查看Kafka Broker连接端口“port”参数值。
  8. root用户登录MRS客户端所在节点(Master1节点)。
  9. 执行以下命令切换到客户端安装目录并配置环境变量。

    cd /opt/client

    source bigdata_env

  10. 创建Kafka Topic。

    kafka-topics.sh --create --zookeeper ZooKeeper角色实例IP地址:ZooKeeper客户端连接端口/kafka --partitions 2 --replication-factor 2 --topic Topic名称

    例如执行以下命令:

    kafka-topics.sh --create --zookeeper 192.168.21.234:2181/kafka --partitions 2 --replication-factor 2 --topic Topic1

    Created topic Topic1.

管理Kafka主题中的消息

  1. root用户登录MRS客户端所在节点(Master1节点)。
  2. 执行以下命令切换到客户端安装目录并配置环境变量。

    cd /opt/client

    source bigdata_env

  3. 在Topic1中产生消息。

    kafka-console-producer.sh --broker-list 6查看的Kafka Broker角色实例所在节点IP:6查看的Broker连接端口 --topic Topic名称 --producer.config /opt/hadoopclient/Kafka/kafka/config/producer.properties

    例如执行以下命令:

    kafka-console-producer.sh --broker-list 192.168.21.21:9092 --topic Topic1 --producer.config /opt/client/Kafka/kafka/config/producer.properties

  4. 重新打开一个客户端连接窗口。

    cd /opt/client

    source bigdata_env

  5. 执行命令消费Topic1中的消息。

    kafka-console-consumer.sh --topic Topic名称 --bootstrap-server 6查看的Kafka Broker角色实例所在节点IP:6查看的Broker连接端口 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

    例如执行以下命令:

    kafka-console-consumer.sh --topic Topic1 --bootstrap-server 192.168.21.21:9092 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

  6. 3产生消息的命令行中输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。

    例如:

    >aaa
    >bbb
    >ccc

    如果需要结束产生消息,使用“Ctrl + C”退出任务。

  7. 5的消费消息窗口中,可以观察到消息被成功消费。

    aaa
    bbb
    ccc

分享:

    相关文档

    相关产品