文档首页/ MapReduce服务 MRS/ 最佳实践/ 生态组件对接/ 使用Kafka Eagle对接MRS Kafka
更新时间:2024-12-17 GMT+08:00

使用Kafka Eagle对接MRS Kafka

应用场景

Kafka Eagle是一款分布式、高可用的Kafka监控软件,提供丰富的Kafka监控指标,例如:Kafka集群的Broker数、Topic数、Consumer数、Topic LogSize Top10、Topic Capacity Top10、Lag挤压、CPU/Memory监控等。

Eagle新版本中改名为EFAK。

方案架构

Kafka是一个分布式的、分区的、多副本的消息发布-订阅系统,它提供了类似于JMS的特性,但在设计上完全不同,它具有消息持久化、高吞吐、分布式、多客户端支持、实时等特性,适用于离线和在线的消息消费,如常规的消息收集、网站活性跟踪、聚合统计系统运营数据(监控数据)、日志收集等大量数据的互联网服务的数据收集场景。

生产者(Producer)将消息发布到Kafka主题(Topic)上,消费者(Consumer)订阅这些主题并消费这些消息。在Kafka集群上一个服务器称为一个Broker。对于每一个主题,Kafka集群保留一个用于缩放、并行化和容错性的分区(Partition)。每个分区是一个有序、不可变的消息序列,并不断追加到提交日志文件。分区的消息每个也被赋值一个称为偏移顺序(Offset)的序列化编号。

图1 Kafka结构

约束与限制

  • 创建并购买一个包含Kafka组件的MRS 3.1.0版本集群,集群未开启Kerberos认证,详情可参考创建MRS集群
  • 安装MRS集群客户端,具体请参考安装客户端

步骤一:配置Kafka Eagle对接MRS参数

  1. 下载Kafka Eagle,此处以EFAK3.0.1版本为例,具体以实际为准。

    例如获取到kafka-eagle-bin-3.0.1.tar.gz软件包

  1. 登录FusionInsight Manager,选择“集群 > 服务 > Kafka > 配置 > 全部配置”搜索并修改参数“KAFKA_JMX_IP”的值为“${BROKER_IP}”。
    图2 修改Kafka参数
  2. 修改完成后,单击左上方“保存”,在弹出的对话框中单击“确定”保存配置。
  3. 单击“概览”页签,选择右上方“更多 > 重启服务”重启Kafka服务。
  4. root用户登录集群主节点,将获取到的EFAK安装包kafka-eagle-bin-3.0.1.tar.gz放到集群目录下,例如“/opt”,执行以下命令解压。

    cd /opt

    tar -xvf kafka-eagle-bin-3.0.1.tar.gz

    cd kafka-eagle-bin-3.0.1

    tar -xvf efak-web-3.0.1-bin.tar.gz

  5. 在“opt”目录下新建目录,例如“efak”,并将“efak-web-3.0.1”复制到“/opt/efak”目录下。

    mkdir /opt/efak

    cp -r /opt/kafka-eagle-bin-3.0.1/efak-web-3.0.1 /opt/efak/

  6. 添加环境变量。

    vi /etc/profile

    新增“export KE_HOME”参数,参数值为efak-web-3.0.1文件所在路径(例如“/opt/efak/efak-web-3.0.1”);在“export PATH”参数值后添加“$KE_HOME/bin”。例如:

    export KE_HOME=/opt/efak/efak-web-3.0.1
    export PATH=$PATH:$KE_HOME/bin
  7. 修改“system-config.properties”配置文件。

    cd /opt/efak/efak-web-3.0.1/conf/

    vi system-config.properties

    #设置集群 
    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=10.20.90.24:2181
    #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
    #修改kafka jmx uri的配置
    cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/kafka 
    #修改kafka mysql jdbc driver address数据库相关的配置
    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://IP:Port/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=root
    efak.password=XXX
    • “cluster1.zk.list”的值是Kafka组件参数“metrics.reporter.zookeeper.url”的值,具体可通过登录FusionInsight Manager,选择“集群 > 服务 > Kafka > 配置 > 全部配置”搜索参数“metrics.reporter.zookeeper.url”查看。
    • “efak.url”的值为MySQL JDBC连接字符串,具体以实际为准。
    • “efak.username”的值为连接数据库使用的用户名称。
    • “efak.password”的值为连接数据库使用的用户名称所对应的密码。

步骤二:启动EFAK服务验证对接结果

  1. 启动EFAK服务。

    sh /opt/efak/efak-web-3.0.1/bin/ke.sh start

    启动成功显示如下,获取EFAK WebUI登录地址。

  2. 使用获取到的登录地址,访问EFAK WebUI界面。

    访问EFKA WebUI界面默认初始账号密码admin/123456

    登录后可以查看Kafka集群监控页面、Topic监控页面、Consumer监控页面,例如:

    图3 Kafka集群监控
    图4 Topic监控
    图5 Consumer监控

Kafka Eagle对接常见问题

问题现象

无法获取Kafka CPU与内存监控信息日志提示。
java.io.IOException cannot be cast to javax.management.remote.JMXConnector

问题原因

jmx地址配置错误。默认jmx地址为:
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

解决办法

MRS内Kafka jmx名称为kafka,需要设置为:
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/kafka