更新时间:2023-03-17 GMT+08:00

配置对接Kafka可靠性

配置场景

Spark Streaming对接Kafka时,当Spark Streaming应用重启后,应用根据上一次读取的topic offset作为起始位置和当前topic最新的offset作为结束位置从Kafka上读取数据的。

Kafka服务的topic的leader异常后,若Kafka的leader和follower的offset相差太大,用户重启Kafka服务,Kafka的follower和leader相互切换,则Kafka服务重启后,topic的offset变小。

  • 若Spark Streaming应用一直在运行,由于Kafka上topic的offset变小,会导致读取Kafka数据的起始位置比结束位置大,这样将无法从Kafka读取数据,应用报错。
  • 若在重启Kafka服务前,先停止Spark Streaming应用,等Kafka重启后,再重启Spark Streaming应用使应用从checkpoint恢复。此时,Spark Streaming应用会记录终止前读取到的offset位置,以此为基准读取后面的数据,而Kafka offset变小(例如从10万变成1万),Spark Streaming会等待Kafka leader的offset增长至10万之后才会去消费,导致新发送的offset在1万至10万之间的数据丢失。

针对上述背景,提供配置Streaming对接Kafka更高级别的可靠性。对接Kafka可靠性功能开启后,上述场景处理方式如下。

  • 若Spark Streaming应用在运行应用时Kafka上topic的offset变小,则会将Kafka上topic最新的offset作为读取Kafka数据的起始位置,继续读取后续的数据。
    对于已经生成但未调度处理的任务,若读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务会运行失败。

    若任务失败过多,则会将executor加入黑名单,从而导致后续的任务无法部署运行。此时用户可以通过配置“spark.blacklist.enabled”参数关闭黑名单功能,黑名单功能默认为开启。

  • 若Kafka上topic的offset变小后,Spark Streaming应用进行重启恢复终止前未处理完的任务若读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务直接丢弃,不进行处理。

若Streaming应用中使用了state函数,则不允许开启对接Kafka可靠性功能。

配置描述

在Spark客户端的“spark-defaults.conf”配置文件中进行设置。

表1 参数说明

参数

说明

默认值

spark.streaming.Kafka.reliability

Spark Streaming对接Kafka是否开启可靠性功能:

  • true:开启可靠性功能
  • false:不开启可靠性功能

false