更新时间:2026-02-11 GMT+08:00
分享

Flink支持Metrics上报Kafka

使用场景

Flink提供了监控指标(Metrics)来帮助分析作业,该特性支持将配置的Metrics上报到Kafka中。

前提条件

  • Flink集群与Metrics上报的Kafka网络连通。
  • Metrics上报的Kafka Topic需要提前创建好。

约束与限制

本章节适用于MRS 3.6.0-LTS及之后的版本。

当前只支持使用Kerberos方式对接Kafka,不支持用户名密码的方式对接Kafka。

配置步骤

  1. 配置Flink支持Metrics上报Kafka参数。

    通过FlinkServer提交作业,Metrics上报Kafka。

    • 在Flink配置界面配置Flink支持Metrics上报Kafka参数
      1. 登录Manager页面,选择“集群 > 服务 > Flink > 配置 > 全部配置”。
      2. 选择“FlinkServer(角色) > 自定义”,在“flink.customized.configs”参数中添加表1中的参数,添加完成后单击“保存”。
      3. 重启受影响的FlinkServer实例。
    • 在FlinkServer作业开发界面配置Flink支持Metrics上报Kafka参数
      1. 选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
      2. 单击“作业管理”进入作业管理页面。
      3. 在指定作业“操作”列单击“开发”进入作业开发界面。
      4. 在作业开发界面的“自定义参数”项中添加表1中的参数,添加完成后单击“保存”。

    通过客户端提交作业,Metrics上报Kafka。

    1. 登录Manager页面,选择“集群 > 服务 > Flink > 配置 > 全部配置”。
    2. 选择“FlinkResource(角色) > 自定义”,在“flink.customized.configs”参数中添加表1中的参数,添加完成后单击“保存”,并重启受影响的FlinkResource实例。
    3. 重新安装客户端或更新客户端配置,具体请参考安装客户端更新客户端
    表1 Flink支持Metrics上报Kafka参数配置

    名称

    是否必填

    描述

    metrics.reporter

    alarm,kafka

    新增的reporter需要追加对应名称。alarm用于Flinkserver的告警,防止自定义参数覆盖Flink原有的参数,需同时填写alarm和kafka。

    metrics.reporter.kafka.factory.class

    com.huawei.flink.metrics.kafka.KafkaReporterFactory

    Flink Metric指标上报给kafka的实现类,值固定为:

    com.huawei.flink.metrics.kafka.KafkaReporterFactory。

    metrics.reporter.kafka.interval

    30s

    Flink Metric指标上报kafka的周期,单位:秒。

    metrics.reporter.kafka.bootstrapServers

    Kafka集群IP1:端口号,Kafka集群IP2:端口号

    Flink Metric指标上报kafka的broker IP、端口。

    metrics.reporter.kafka.metricNames

    -

    允许上报的监控项。如:numRecordsIn,numRecordsOut。

    metrics.reporter.kafka.topicName

    -

    上报给Kafka的Topic,需要提前创建好。如:flink-metrics。

    metrics.reporter.kafka.enableJobInfo

    取值范围:true/false

    默认值:false

    是否在指标上报中添加作业信息,包括作业名、提交用户以及Yarn队列名。填入true时开启。

    metrics.reporter.kafka.yarnQueue

    -

    上报指标JSON中添加的Yarn队列名。

    metrics.reporter.kafka.jobName

    -

    上报指标JSON中添加的作业名。

    metrics.reporter.kafka.properties.kafka配置参数

    -

    如果有其他Kafka参数需要配置,可以通过此参数填写。例如认证参数(安全模式集群必填):

    metrics.reporter.kafka.properties.security.protocol=SASL_PLAINTEXT

    metrics.reporter.kafka.properties.sasl.kerberos.service.name=kafka

    metrics.reporter.kafka.properties.kerberos.domain.name=hadoop.xxxx.COM

  2. 提交Flink作业,对应的Metrics就会被上报到配置的Kafka Topic中。

    数据格式参考如下JSON:

    {
    	"jobName": "kafka_test",
    	"submitUserName": "admintest",
    	"metric": {
    		"tm_id": "container_1761419637450_0006_01_000002",
    		"host": "8-5-128-12",
    		"name": "flink_taskmanager.Status.JVM.Memory.NonHeap_Used",
    		"metricType": "GAUGE",
    	    "value": 120661112
    	},
    	"reportTime": 1761530081855,
    	"yarnQueue": "default"
    }

    如果配置的Kafka服务出现异常,无法连接时,Flink作业业务不会中断,但是Metrics功能将会受到影响,监控指标无法发送给Kafka。

相关文档