使用Kafka Eagle对接MRS Kafka
应用场景
Kafka Eagle是一款分布式、高可用的Kafka监控软件,提供丰富的Kafka监控指标,例如:Kafka集群的Broker数、Topic数、Consumer数、Topic LogSize Top10、Topic Capacity Top10、Lag挤压、CPU/Memory监控等。
Eagle新版本中改名为EFAK。
方案架构
Kafka是一个分布式的、分区的、多副本的消息发布-订阅系统,它提供了类似于JMS的特性,但在设计上完全不同,它具有消息持久化、高吞吐、分布式、多客户端支持、实时等特性,适用于离线和在线的消息消费,如常规的消息收集、网站活性跟踪、聚合统计系统运营数据(监控数据)、日志收集等大量数据的互联网服务的数据收集场景。
生产者(Producer)将消息发布到Kafka主题(Topic)上,消费者(Consumer)订阅这些主题并消费这些消息。在Kafka集群上一个服务器称为一个Broker。对于每一个主题,Kafka集群保留一个用于缩放、并行化和容错性的分区(Partition)。每个分区是一个有序、不可变的消息序列,并不断追加到提交日志文件。分区的消息每个也被赋值一个称为偏移顺序(Offset)的序列化编号。
步骤一:配置Kafka Eagle对接MRS参数
- 下载Kafka Eagle,此处以EFAK3.0.1版本为例,具体以实际为准。
例如获取到kafka-eagle-bin-3.0.1.tar.gz软件包。
- 登录FusionInsight Manager,选择“集群 > 服务 > Kafka > 配置 > 全部配置”搜索并修改参数“KAFKA_JMX_IP”的值为“${BROKER_IP}”。
图2 修改Kafka参数
- 修改完成后,单击左上方“保存”,在弹出的对话框中单击“确定”保存配置。
- 单击“概览”页签,选择右上方“更多 > 重启服务”重启Kafka服务。
- 以root用户登录集群主节点,将获取到的EFAK安装包kafka-eagle-bin-3.0.1.tar.gz放到集群目录下,例如“/opt”,执行以下命令解压。
cd /opt
tar -xvf kafka-eagle-bin-3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1
tar -xvf efak-web-3.0.1-bin.tar.gz
- 在“opt”目录下新建目录,例如“efak”,并将“efak-web-3.0.1”复制到“/opt/efak”目录下。
cp -r /opt/kafka-eagle-bin-3.0.1/efak-web-3.0.1 /opt/efak/
- 添加环境变量。
新增“export KE_HOME”参数,参数值为efak-web-3.0.1文件所在路径(例如“/opt/efak/efak-web-3.0.1”);在“export PATH”参数值后添加“$KE_HOME/bin”。例如:
export KE_HOME=/opt/efak/efak-web-3.0.1 export PATH=$PATH:$KE_HOME/bin
- 修改“system-config.properties”配置文件。
cd /opt/efak/efak-web-3.0.1/conf/
vi system-config.properties
#设置集群 kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=10.20.90.24:2181 #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 #修改kafka jmx uri的配置 cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/kafka #修改kafka mysql jdbc driver address数据库相关的配置 efak.driver=com.mysql.cj.jdbc.Driver efak.url=jdbc:mysql://IP:Port/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=XXX
- “cluster1.zk.list”的值是Kafka组件参数“metrics.reporter.zookeeper.url”的值,具体可通过登录FusionInsight Manager,选择“集群 > 服务 > Kafka > 配置 > 全部配置”搜索参数“metrics.reporter.zookeeper.url”查看。
- “efak.url”的值为MySQL JDBC连接字符串,具体以实际为准。
- “efak.username”的值为连接数据库使用的用户名称。
- “efak.password”的值为连接数据库使用的用户名称所对应的密码。
步骤二:启动EFAK服务验证对接结果
Kafka Eagle对接常见问题
问题现象:
java.io.IOException cannot be cast to javax.management.remote.JMXConnector
问题原因:
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
解决办法:
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/kafka