从零开始使用Kafka
MapReduce服务(MapReduce Service)提供租户完全可控的企业级大数据集群云服务,轻松运行Hadoop、Spark、HBase、Kafka、Storm等大数据组件 。
本指导以不开启Kerberos认证的集群为例提供从零开始在Kafka主题中产生和消费消息的操作指导。
视频介绍
使用Kafka客户端创建Topic案例可参考使用Kafka客户端创建Topic,该视频以未开启Kerberos认证的MRS 3.1.0版本集群为例,介绍MRS集群创建成功后,如何在Kafka客户端完成对Topic的创建、查询、删除等操作。
因不同版本操作界面可能存在差异,相关视频供参考,具体以实际环境为准。
购买集群
- 购买集群
- 登录华为云控制台。
- 选择“大数据 > MapReduce服务”,进入MapReduce服务管理控制台。
- 单击“购买集群”,进入“购买集群”页面。
- 选择“自定义购买”页签。
- 软件配置
- “区域”请根据需要选择。
- “计费模式”选择按需计费。
- “集群名称”填写“mrs_demo”或按命名规范命名。
- 集群类型:选择“流式集群”即可。
- 版本类型:选择“普通版”。
- 集群版本:选择“MRS 3.1.0”。
- 组件选择:勾选所有流式集群组件。
- 单击“下一步”。
- 硬件配置
- “可用区”选择“可用区2”。
- “企业项目”选择“default”。
- “虚拟私有云”和“子网”保持默认不修改,也可单击“查看虚拟私有云”重新创建。
- “安全组”默认选择“自动创建”。
- “弹性公网IP”默认选择“暂不绑定”。
图1 硬件配置
- “集群节点”中Master和Core的实例规格保持默认值。节点数量、数据盘类型及大小保持默认值。不添加Task节点。
图2 集群节点配置
- 单击“下一步”。
- 高级配置
- “Kerberos认证”选择关闭。
- “用户名”默认为admin。
- “密码”和“确认密码”请配置Manager管理员用户的密码。
- “登录方式”选择“密码”,并为root用户输入密码及确认密码。
- “主机名前缀”保持默认暂不配置。
- 勾选“高级配置”并选择“委托”为“MRS_ECS_DEFAULT_AGENCY”。
- 单击“下一步”。
- 确认配置
- “配置”:确认软件配置、硬件配置及高级配置信息。
- 勾选通信安全授权。
- 单击“立即购买”,进入任务提交成功页面。
- 单击“返回集群列表”,在“现有集群”列表中可以查看到集群创建的状态。集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。
安装Kafka客户端
- 在“集群列表 > 现有集群”列表中,单击名称“mrs_demo”,进入集群信息页面。
- 单击“集群管理页面 ”后的“前往 Manager”,在弹出的窗口中配置弹性IP信息,单击“确定”,输入用户名和密码进入Manager界面。
- 在Manager界面,选择“集群 > 服务 > Kafka > 更多 > 下载客户端”,选择“完整客户端”、对应的平台类型、勾选“仅保存到如下路径”,单击“确定”。得到Kafka客户端软件包,例如:FusionInsight_Cluster_1_Kafka_Client.tar
- 以root用户登录Master1节点。
- 进入安装包所在目录,执行如下命令解压、校验安装包。并解压获取的安装文件。
cd /tmp/FusionInsight-Client
tar -xvf FusionInsight_Cluster_1_Kafka_Client.tar
sha256sum -c FusionInsight_Cluster_1_Kafka_ClientConfig.tar.sha256
tar -xvf FusionInsight_Cluster_1_Kafka_ClientConfig.tar
- 进入安装包所在目录,执行如下命令安装客户端到指定目录(绝对路径),例如安装到“/opt/hadoopclient”目录。
cd /tmp/FusionInsight-Client/FusionInsight_Cluster_1_Kafka_ClientConfig
执行./install.sh /opt/hadoopclient命令,等待客户端安装完成。
- 检查客户端是否安装成功。
cd /opt/hadoopclient
source bigdata_env
输入klist命令查询并确认权限内容。执行成功则说明Kafka客户端安装成功。
登录Master节点(VNC方式)
- 在“集群列表 > 现有集群”列表中,单击名称“mrs_demo”,在“节点管理”页签中找到类型为“Master1”的节点,并单击其名称,跳转至云服务器控制台上的该弹性云服务器详情页面。
图3 远程登录Master1节点
- 单击页面右上角的“远程登录”,远程登录Master节点。登录使用用户名“root”,密码为购买集群时设置的密码。
使用Kafka客户端创建topic
- 配置环境变量。Kafka客户端的安装目录以“/opt/hadoopclient”为例。
source /opt/hadoopclient/bigdata_env
- 在“集群列表 > 现有集群”列表中,单击名称“mrs_demo”,进入集群“概览”页面。在“概览”页面单击“IAM用户同步”后的“同步”等待同步完成。
- 选择“组件管理 > ZooKeeper > 实例”,查看ZooKeeper角色实例的IP地址。记录ZooKeeper角色实例中任意一个的IP地址即可。
图4 ZooKeeper角色实例IP
- 执行如下命令,创建Kafka Topic。
kafka-topics.sh --create --zookeeper <ZooKeeper角色实例所在节点IP:2181/kafka> --partitions 2 --replication-factor 2 --topic <Topic名称>
管理Kafka主题中的消息
- 选择“组件管理 > Kafka > 实例”,查看Kafka角色实例的IP地址。记录Kafka角色实例中任意一个的IP地址即可。
图5 Kafka角色实例IP
- 登录Master节点,执行以下命令在topic test中产生消息。
cd /opt/hadoopclient
source bigdata_env
kafka-console-producer.sh --broker-list <Kafka角色实例所在节点IP:9092> --topic <Topic名称> --producer.config /opt/hadoopclient/Kafka/kafka/config/producer.properties
其中<Topic名称>为使用Kafka客户端创建topic创建的Topic名称。
然后输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。如果需要结束产生消息,使用“Ctrl + C”退出任务。
- 消费topic test中的消息。
kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka角色实例所在节点IP:9092> --consumer.config /opt/hadoopclient/Kafka/kafka/config/consumer.properties