更新时间:2025-09-10 GMT+08:00
分享

FlinkSQL Kafka表开发建议

Kafka作为source表时应设置限流

本章节适用于MRS 3.3.0及以后版本。

防止上限超过流量峰值,导致作业异常带来不稳定因素。因此建议设置限流,限流上限应该为业务上线压测的峰值。

【示例】Kafka作为source表时设置限流:

#如下参数作用在每个并行度
'scan.records-per-second.limit' = '1000'
#真实的限流流量如下
min( parallelism * scan.records-per-second.limit,partitions num * scan.records-per-second.limit)

为保证数据准确性将同key数据写入Kafka的同一个分区

Flink写Kafka使用fixed策略,并在写入之前根据key进行Hash。

【示例】Flink写Kafka使用fixed策略:

CREATE TABLE kafka (
 f_sequence INT,
 f_sequence1 INT,
 f_sequence2 INT,
 f_sequence3 INT 
 ) WITH ( 
 'connector' = 'kafka',
 'topic' = 'yxtest123',
 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
 'properties.group.id' = 'testGroup1',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json',
 'sink.partitioner'='fixed'
 ); 

 insert into kafka select /*+ DISTRIBUTEBY('f_sequence','f_sequence1') */ * from datagen;

为提升Kafka消费速度可将Kafka Source并行度与Topic分区数保持一致

当Kafka Source并行度大于Topic分区数时,多余的并行度不能消费数据。

连接MRS集群中启用了Kerberos认证(安全模式)的Kafka

Flink作业WITH属性里需带有以下相关配置:

#Kafka Kerberos 服务名
'properties.sasl.kerberos.service.name' = 'kafka',
#Kafka对外发布端口使用的通信协议
'properties.security.protocol' = 'SASL_PLAINTEXT',  
#系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名
'properties.kerberos.domain.name' = 'hadoop.系统域名'

【示例】连接主题为“test_kafka”的已启用Kerberos认证的Kafka:

CREATE TABLE kafka (
 f_sequence INT,
 f_sequence1 INT,
 f_sequence2 INT,
 f_sequence3 INT 
 ) WITH ( 
'connector' = 'kafka',
'topic' = 'test_kafka',
'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka', --Kafka Kerberos服务名
'properties.security.protocol' = 'SASL_PLAINTEXT', --Kafka对外发布端口使用的通信协议
'properties.kerberos.domain.name' = 'hadoop.系统域名' 
 ); 
  • Kafka的Broker实例业务IP:服务的实例IP地址可通过登录Manager界面后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
  • Kafka端口号:
    • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
    • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

      登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

  • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

相关文档