FlinkSQL Kafka Connector支持offset从group组中恢复,状态从CheckPoint恢复
该章节仅适用于MRS 3.6.0-LTS及之后版本。
使用场景
配置Flink作业时,Kafka作业从CheckPoint恢复,支持offset从group组中恢复,状态从CheckPoint中恢复。
使用限制
- 需要保证CheckPoint恢复前后的Kafka topic name一致。
- 需要保证Kafka group组的offset和CheckPoint记录的offset一致,否则会导致缺数问题。
使用方法
在创建的Kafka Connector Source流表中,新增设置“restore.mode”的值为“group-offsets”。
SQL示例:
CREATE TABLE KafkaSource (
`user_id` VARCHAR,
`user_name` VARCHAR,
`age` INT
) WITH (
'connector' = 'kafka',
'topic' = 'test_source',
'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.系统域名',
'restore.mode' = 'group-offsets'
);
CREATE TABLE KafkaSink(`user_id` VARCHAR, `value` BIGINT) WITH ('connector' = 'print');
Insert into
KafkaSink
select
user_id,
sum(age) as `value`
from
KafkaSource
group by
user_id;