配置对接Kafka可靠性
配置场景
Spark Streaming对接Kafka时,当Spark Streaming应用重启后,应用根据上一次读取的topic offset作为起始位置和当前topic最新的offset作为结束位置从Kafka上读取数据的。
Kafka服务的topic的leader异常后,若Kafka的leader和follower的offset相差太大,用户重启Kafka服务,Kafka的follower和leader相互切换,则Kafka服务重启后,topic的offset变小。
- 若Spark Streaming应用一直在运行,由于Kafka上topic的offset变小,会导致读取Kafka数据的起始位置比结束位置大,这样将无法从Kafka读取数据,应用报错。
- 若在重启Kafka服务前,先停止Spark Streaming应用,等Kafka重启后,再重启Spark Streaming应用使应用从checkpoint恢复。此时,Spark Streaming应用会记录终止前读取到的offset位置,以此为基准读取后面的数据,而Kafka offset变小(例如从10万变成1万),Spark Streaming会等待Kafka leader的offset增长至10万之后才会去消费,导致新发送的offset在1万至10万之间的数据丢失。
针对上述背景,提供配置Streaming对接Kafka更高级别的可靠性。对接Kafka可靠性功能开启后,上述场景处理方式如下。
- 若Spark Streaming应用在运行应用时Kafka上topic的offset变小,则会将Kafka上topic最新的offset作为读取Kafka数据的起始位置,继续读取后续的数据。
- 若Kafka上topic的offset变小后,Spark Streaming应用进行重启恢复终止前未处理完的任务若读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务直接丢弃,不进行处理。
若Streaming应用中使用了state函数,则不允许开启对接Kafka可靠性功能。
配置描述
在Spark客户端的“spark-defaults.conf”配置文件中进行设置。
参数 |
说明 |
默认值 |
---|---|---|
spark.streaming.Kafka.reliability |
Spark Streaming对接Kafka是否开启可靠性功能:
|
false |