配置Flink作业日志写入Kafka
场景介绍
log4j提供Kafka Appender,允许日志直接写入Kafka,Flink可以通过log4j,将JobManager和TaskManager日志写入Kafka中。
约束与限制
- 写入Kafka的日志不支持回滚,防止磁盘被写满,需要配置Kafka数据过期策略。
- log4j 2.17.2版本开源不支持该功能。
- 仅支持Kafka非安全端口。
- 本章节仅适用于MRS 3.6.0-LTS及之后版本。
配置步骤
- 登录MRS集群Manager界面。
- 选择“集群 > 服务 > Flink > 配置 > 全部配置”。
- 配置Flink作业日志写入Kafka。
- 通过FlinkServer对接提交作业。
选择“FlinkServer(角色) > 自定义”,在“flink.log4j.customized.configs”参数中添加配置,配置内容可参考如下配置。添加完成后单击“保存”,并重启受影响的FlinkServer实例。
配置内容:
appender.kafka.layout.additionalField1.key = logdir appender.kafka.layout.compact = true appender.kafka.layout.additionalField1.value = ${sys:log.file} appender.kafka.layout.complete = false appender.kafka.ignoreExceptions = false appender.kafka.topic = flink_logs_D appender.kafka.name = Kafka appender.kafka.layout.type = JSONLayout appender.kafka.layout.additionalField1.type = KeyValuePair appender.kafka.syncSend = true appender.kafka.type = Kafka appender.kafka.bootstrap.type = Property appender.kafka.bootstrap.name = bootstrap.servers appender.kafka.bootstrap.value = Kafka的Broker实例业务IP:Kafka端口号 appender.kafka.layout.value= net.logstash.log4j.JSONEventLayoutV1 rootLogger.appenderRef.kafka.ref= Kafka
Kafka Broker实例IP地址及端口号说明:- 服务的实例IP地址可通过登录Manager界面后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。如果是安全模式集群,则需要配置“allow.everyone.if.no.acl.found”参数为“true”,具体操作如下:
登录Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为“true”,保存配置即可。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。
- 通过FlinkResource对接提交作业。
- 选择“FlinkResource(角色) > 自定义”,在“flink.log4j.customized.configs”参数中添加配置,配置内容可参考通过FlinkServer对接提交作业。的“配置内容”,添加完成后单击“保存”,并重启受影响的FlinkResource实例。
- 重新安装客户端或更新已有客户端配置,具体请参考安装客户端或更新客户端。
- 修改Flink客户端配置。
- 已安装集群客户端,例如安装目录为“/opt/hadoopclient”。
- 以客户端安装用户,登录安装客户端的节点。
- 在“/opt/hadoopclient/Flink/flink/conf/log4j.properties”文件中新增Kafka Appender的配置并保存。
Kafka Appender的配置内容可参考通过FlinkServer对接提交作业。的“配置内容”。
因功能借助log4j appender实现,如果Kafka服务出现异常,无法连接时,Flink任务会受影响,业务中断。
- 通过FlinkServer对接提交作业。