更新时间:2025-12-10 GMT+08:00
分享

配置Flink作业日志写入Kafka

场景介绍

log4j提供Kafka Appender,允许日志直接写入Kafka,Flink可以通过log4j,将JobManager和TaskManager日志写入Kafka中。

约束与限制

  • 写入Kafka的日志不支持回滚,防止磁盘被写满,需要配置Kafka数据过期策略。
  • log4j 2.17.2版本开源不支持该功能。
  • 仅支持Kafka非安全端口。
  • 本章节仅适用于MRS 3.6.0-LTS及之后版本。

配置步骤

  1. 登录MRS集群Manager界面。
  2. 选择“集群 > 服务 > Flink > 配置 > 全部配置”。
  3. 配置Flink作业日志写入Kafka。

    • 通过FlinkServer对接提交作业。

      选择“FlinkServer(角色) > 自定义”,在“flink.log4j.customized.configs”参数中添加配置,配置内容可参考如下配置。添加完成后单击“保存”,并重启受影响的FlinkServer实例。

      配置内容:

      appender.kafka.layout.additionalField1.key = logdir
      appender.kafka.layout.compact = true
      appender.kafka.layout.additionalField1.value = ${sys:log.file}
      appender.kafka.layout.complete = false
      appender.kafka.ignoreExceptions = false
      appender.kafka.topic = flink_logs_D
      appender.kafka.name = Kafka
      appender.kafka.layout.type = JSONLayout
      appender.kafka.layout.additionalField1.type = KeyValuePair
      appender.kafka.syncSend = true
      appender.kafka.type = Kafka
      appender.kafka.bootstrap.type = Property
      appender.kafka.bootstrap.name = bootstrap.servers
      appender.kafka.bootstrap.value = Kafka的Broker实例业务IP:Kafka端口号
      appender.kafka.layout.value= net.logstash.log4j.JSONEventLayoutV1
      rootLogger.appenderRef.kafka.ref= Kafka
      Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录Manager界面后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。如果是安全模式集群,则需要配置“allow.everyone.if.no.acl.found”参数为“true”,具体操作如下:

        登录Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为“true”,保存配置即可。

      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。
    • 通过FlinkResource对接提交作业。
      1. 选择“FlinkResource(角色) > 自定义”,在“flink.log4j.customized.configs”参数中添加配置,配置内容可参考通过FlinkServer对接提交作业。的“配置内容”,添加完成后单击“保存”,并重启受影响的FlinkResource实例。
      2. 重新安装客户端或更新已有客户端配置,具体请参考安装客户端更新客户端
    • 修改Flink客户端配置。
      1. 已安装集群客户端,例如安装目录为“/opt/hadoopclient”。
      2. 以客户端安装用户,登录安装客户端的节点。
      3. 在“/opt/hadoopclient/Flink/flink/conf/log4j.properties”文件中新增Kafka Appender的配置并保存。

        Kafka Appender的配置内容可参考通过FlinkServer对接提交作业。的“配置内容”。

        因功能借助log4j appender实现,如果Kafka服务出现异常,无法连接时,Flink任务会受影响,业务中断。

相关文档