Flink支持Metrics上报Kafka
使用场景
Flink提供了监控指标(Metrics)来帮助分析作业,该特性支持将配置的Metrics上报到Kafka中。
前提条件
- Flink集群与Metrics上报的Kafka网络连通。
- Metrics上报的Kafka Topic需要提前创建好。
约束与限制
本章节适用于MRS 3.6.0-LTS及之后的版本。
当前只支持使用Kerberos方式对接Kafka,不支持用户名密码的方式对接Kafka。
配置步骤
- 配置Flink支持Metrics上报Kafka参数。
通过FlinkServer提交作业,Metrics上报Kafka。
- 在Flink配置界面配置Flink支持Metrics上报Kafka参数
- 登录Manager页面,选择“集群 > 服务 > Flink > 配置 > 全部配置”。
- 选择“FlinkServer(角色) > 自定义”,在“flink.customized.configs”参数中添加表1中的参数,添加完成后单击“保存”。
- 重启受影响的FlinkServer实例。
- 在FlinkServer作业开发界面配置Flink支持Metrics上报Kafka参数
- 选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 单击“作业管理”进入作业管理页面。
- 在指定作业“操作”列单击“开发”进入作业开发界面。
- 在作业开发界面的“自定义参数”项中添加表1中的参数,添加完成后单击“保存”。
通过客户端提交作业,Metrics上报Kafka。
- 登录Manager页面,选择“集群 > 服务 > Flink > 配置 > 全部配置”。
- 选择“FlinkResource(角色) > 自定义”,在“flink.customized.configs”参数中添加表1中的参数,添加完成后单击“保存”,并重启受影响的FlinkResource实例。
- 重新安装客户端或更新客户端配置,具体请参考安装客户端或更新客户端。
表1 Flink支持Metrics上报Kafka参数配置 名称
值
是否必填
描述
metrics.reporter
alarm,kafka
是
新增的reporter需要追加对应名称。alarm用于Flinkserver的告警,防止自定义参数覆盖Flink原有的参数,需同时填写alarm和kafka。
metrics.reporter.kafka.factory.class
com.huawei.flink.metrics.kafka.KafkaReporterFactory
是
Flink Metric指标上报给kafka的实现类,值固定为:
com.huawei.flink.metrics.kafka.KafkaReporterFactory。
metrics.reporter.kafka.interval
30s
是
Flink Metric指标上报kafka的周期,单位:秒。
metrics.reporter.kafka.bootstrapServers
Kafka集群IP1:端口号,Kafka集群IP2:端口号
是
Flink Metric指标上报kafka的broker IP、端口。
metrics.reporter.kafka.metricNames
-
是
允许上报的监控项。如:numRecordsIn,numRecordsOut。
metrics.reporter.kafka.topicName
-
是
上报给Kafka的Topic,需要提前创建好。如:flink-metrics。
metrics.reporter.kafka.enableJobInfo
取值范围:true/false
默认值:false
否
是否在指标上报中添加作业信息,包括作业名、提交用户以及Yarn队列名。填入true时开启。
metrics.reporter.kafka.yarnQueue
-
否
上报指标JSON中添加的Yarn队列名。
metrics.reporter.kafka.jobName
-
否
上报指标JSON中添加的作业名。
metrics.reporter.kafka.properties.kafka配置参数
-
否
如果有其他Kafka参数需要配置,可以通过此参数填写。例如认证参数(安全模式集群必填):
metrics.reporter.kafka.properties.security.protocol=SASL_PLAINTEXT
metrics.reporter.kafka.properties.sasl.kerberos.service.name=kafka
metrics.reporter.kafka.properties.kerberos.domain.name=hadoop.xxxx.COM
- 在Flink配置界面配置Flink支持Metrics上报Kafka参数
- 提交Flink作业,对应的Metrics就会被上报到配置的Kafka Topic中。
数据格式参考如下JSON:
{ "jobName": "kafka_test", "submitUserName": "admintest", "metric": { "tm_id": "container_1761419637450_0006_01_000002", "host": "8-5-128-12", "name": "flink_taskmanager.Status.JVM.Memory.NonHeap_Used", "metricType": "GAUGE", "value": 120661112 }, "reportTime": 1761530081855, "yarnQueue": "default" }
如果配置的Kafka服务出现异常,无法连接时,Flink作业业务不会中断,但是Metrics功能将会受到影响,监控指标无法发送给Kafka。