Setting Source Parallelism
Flink SQL allows you to use the source.parallelism parameter to set the number of concurrent source operators to deal with data skew and back pressure and improve job performance.
This feature changes the Forward partition of source and downstream operators to the Rebalance partition. When the number of concurrent source operators is different from the number of concurrent downstream operators (parallelisms) and data disorder is not allowed, enable the DISTRIBUTEBY feature together with this feature. For details, see Using the DISTRIBUTEBY Feature.
The following example sets the number of concurrent source operators to 2 and enables the DISTRIBUTEBY feature:
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Service IP address of the Kafka broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.System domain name', -- Set the number of concurrent source operators. 'source.parallelism' = '2' ); CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Service IP address of the Kafka broker instance:Kafka port', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.System domain name' ); -- Insert into KafkaSink select user_id, user_name, age from KafkaSource (DISTRIBUTEBY disabled) -- Enable DISTRIBUTEBY. Insert into KafkaSink select/*+ DISTRIBUTEBY('user_id') */ user_id, user_name, age from KafkaSource;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot