Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Enterprise-Class Enhancements of Flink/ Limiting Read Rate for Flink SQL Kafka and Upsert Kafka Connector
Updated on 2025-08-22 GMT+08:00

Limiting Read Rate for Flink SQL Kafka and Upsert Kafka Connector

This section applies to MRS 3.3.0 or later.

Scenarios

Traffic limiting is required when Kafka and the Upsert Kafka connector consume data.

How to Use

Add the subtask.scan.records-per-second.limit parameter to the created source stream table. This parameter indicates the number of Kafka records consumed in a single partition per second. The overall traffic on the source end is min(source parallelism * subtask.scan.records-per-second.limit, kafka partitions num * subtask.scan.records-per-second.limit).

The following is a SQL example:

CREATE TABLE KafkaSource (
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv',
  'subtask.scan.records-per-second.limit' = '1000', --1000 records are consumed per second.
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.System domain name'
);
CREATE TABLE KafkaSink(
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_sink',
  'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.System domain name'
);
Insert into
  KafkaSink
select
  *
from
  KafkaSource;
  • The IP address and port number of the Kafka Broker instance are as follows:
    • To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
    • If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
    • If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the Broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

      Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.

  • System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.