更新时间:2024-11-29 GMT+08:00

FlinkSQL Kafka和upsert-kafka Connector支持限流读

使用场景

使用FlinkSQL的Kafka、upsert-kafka Connector消费数据时需要限流。

使用方法

在创建的Source流表中添加“subtask.scan.records-per-second.limit”参数,该参数表示每秒消费Kafka单分区记录数,因此Source端整体限流速率为:min( source parallelism * subtask.scan.records-per-second.limit,kafka partitions num * subtask.scan.records-per-second.limit)

SQL示例如下:

CREATE TABLE KafkaSource (
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv',
  'subtask.scan.records-per-second.limit' = '1000',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.系统域名'
);
CREATE TABLE KafkaSink(
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_sink',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.系统域名'
);
Insert into
  KafkaSink
select
  *
from
  KafkaSource;