文档首页/
MapReduce服务 MRS/
组件操作指南(安卡拉区域)/
使用Flink/
FlinkSQL特性增强/
FlinkSQL Kafka和upsert-kafka Connector支持限流读
更新时间:2024-11-29 GMT+08:00
FlinkSQL Kafka和upsert-kafka Connector支持限流读
使用场景
使用FlinkSQL的Kafka、upsert-kafka Connector消费数据时需要限流。
使用方法
在创建的Source流表中添加“subtask.scan.records-per-second.limit”参数,该参数表示每秒消费Kafka单分区记录数,因此Source端整体限流速率为:min( source parallelism * subtask.scan.records-per-second.limit,kafka partitions num * subtask.scan.records-per-second.limit)
SQL示例如下:
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'subtask.scan.records-per-second.limit' = '1000', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); Insert into KafkaSink select * from KafkaSource;
父主题: FlinkSQL特性增强