文档首页/ MapReduce服务 MRS/ 组件操作指南(LTS版)/ 使用Flink/ Flink企业级能力增强/ FlinkSQL Kafka Connector支持offset从group组中恢复,状态从CheckPoint恢复
更新时间:2025-12-10 GMT+08:00
分享

FlinkSQL Kafka Connector支持offset从group组中恢复,状态从CheckPoint恢复

该章节仅适用于MRS 3.6.0-LTS及之后版本。

使用场景

配置Flink作业时,Kafka作业从CheckPoint恢复,支持offset从group组中恢复,状态从CheckPoint中恢复。

使用限制

  • 需要保证CheckPoint恢复前后的Kafka topic name一致。
  • 需要保证Kafka group组的offset和CheckPoint记录的offset一致,否则会导致缺数问题。

使用方法

在创建的Kafka Connector Source流表中,新增设置“restore.mode”的值为“group-offsets”。

SQL示例:

CREATE TABLE KafkaSource (
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.系统域名',
  'restore.mode' = 'group-offsets'
);
CREATE TABLE KafkaSink(`user_id` VARCHAR, `value` BIGINT) WITH ('connector' = 'print');
Insert into
  KafkaSink
select
  user_id,
  sum(age) as `value`
from
  KafkaSource
group by
  user_id;

相关文档