Updated on 2024-11-29 GMT+08:00

Setting Source Parallelism

Flink SQL allows you to use the source.parallelism parameter to set the number of concurrent source operators to deal with data skew and back pressure and improve job performance.

This feature changes the Forward partition of source and downstream operators to the Rebalance partition. When the number of concurrent source operators is different from the number of concurrent downstream operators (parallelisms) and data disorder is not allowed, enable the DISTRIBUTEBY feature together with this feature. For details, see Using the DISTRIBUTEBY Feature.

The following example sets the number of concurrent source operators to 2 and enables the DISTRIBUTEBY feature:

CREATE TABLE KafkaSource (
`user_id` VARCHAR,
`user_name` VARCHAR,
 `age` INT
 ) WITH ( 
 'connector' = 'kafka',  
 'topic' = 'test_source', 
 'properties.bootstrap.servers' = 'Service IP address of the Kafka broker instance:Kafka port',  
 'properties.group.id' = 'testGroup',  
 'scan.startup.mode' = 'latest-offset',  
 'format' = 'csv',  
 'properties.sasl.kerberos.service.name' = 'kafka', 
 'properties.security.protocol' = 'SASL_PLAINTEXT', 
  'properties.kerberos.domain.name' = 'hadoop.System domain name',
 -- Set the number of concurrent source operators.
 'source.parallelism' = '2'
 ); 
CREATE TABLE KafkaSink( 
  `user_id` VARCHAR, 
  `user_name` VARCHAR,  
 `age` INT
 ) WITH ( 
  'connector' = 'kafka', 
  'topic' = 'test_sink', 
  'properties.bootstrap.servers' = 'Service IP address of the Kafka broker instance:Kafka port', 
  'value.format' = 'csv', 
  'properties.sasl.kerberos.service.name' = 'kafka',
   'properties.security.protocol' = 'SASL_PLAINTEXT',
   'properties.kerberos.domain.name' = 'hadoop.System domain name'
 ); 
--  Insert into KafkaSink select user_id, user_name, age from KafkaSource (DISTRIBUTEBY disabled)
-- Enable DISTRIBUTEBY.
Insert into KafkaSink select/*+ DISTRIBUTEBY('user_id') */ user_id, user_name, age from KafkaSource;