更新时间:2024-11-29 GMT+08:00

FlinkSQL支持设置Source的并发

FlinkSQL支持通过使用参数“source.parallelism”设置Source算子的并发数,解决下游算子的并发数引起的一些问题,例如下游算子发送数据倾斜、背压、作业性能慢等问题。

该特性会将Source和下游算子的Forward分区改为Rebalance分区,所以当Source算子的并发数和下游算子的并发数(parallelism数)不一致时,且作业不允许数据乱序,需要在启用该特性的同时开启DISTRIBUTEBY特性,可参考FlinkSQL DISTRIBUTEBY

如设置Source并发数为“2”并开启DISTRIBUTEBY特性:

CREATE TABLE KafkaSource (
`user_id` VARCHAR,
`user_name` VARCHAR,
 `age` INT
 ) WITH ( 
 'connector' = 'kafka',  
 'topic' = 'test_source', 
 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',  
 'properties.group.id' = 'testGroup',  
 'scan.startup.mode' = 'latest-offset',  
 'format' = 'csv',  
 'properties.sasl.kerberos.service.name' = 'kafka', 
 'properties.security.protocol' = 'SASL_PLAINTEXT', 
 'properties.kerberos.domain.name' = 'hadoop.系统域名',
 -- 设置Source并发数
 'source.parallelism' = '2'
 ); 
CREATE TABLE KafkaSink( 
  `user_id` VARCHAR, 
  `user_name` VARCHAR,  
 `age` INT
 ) WITH ( 
  'connector' = 'kafka', 
  'topic' = 'test_sink', 
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 
  'value.format' = 'csv', 
  'properties.sasl.kerberos.service.name' = 'kafka',
   'properties.security.protocol' = 'SASL_PLAINTEXT',
   'properties.kerberos.domain.name' = 'hadoop.系统域名'
 ); 
-- Insert into KafkaSink select user_id, user_name, age from KafkaSource;(未开启DISTRIBUTEBY特性-- 开启DISTRIBUTEBY特性
Insert into KafkaSink select/*+ DISTRIBUTEBY('user_id') */ user_id, user_name, age from KafkaSource;