文档首页/ 数据湖探索 DLI/ 常见问题/ Flink作业类/ Flink SQL作业类/ Kafka Sink配置发送失败重试机制
更新时间:2024-11-08 GMT+08:00

Kafka Sink配置发送失败重试机制

问题描述

用户执行Flink Opensource SQL, 采用Flink 1.10版本。Flink Sink写Kafka报错后作业失败:

Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

问题原因

由于CPU使用率过高,导致网络闪断。

解决方案

在SQL语句中配置发送失败重试:connector.properties.retries=5

create table kafka_sink(
     car_type string
    , car_name string
    , primary key (union_id) not enforced
) with (
    "connector.type" = "upsert-kafka",
    "connector.version" = "0.11",
    "connector.properties.bootstrap.servers" = "xxxx:9092",
    "connector.topic" = "kafka_car_topic ",
    "connector.sink.ignore-retraction" = "true",
    "connector.properties.retries" = "5",
    "format.type" = "json"
);