更新时间:2025-07-01 GMT+08:00
分享

FlinkSQL Kafka表开发规则

Kafka作为sink表时必须指定“topic”配置项

【示例】向Kafka的“test_sink”主题插入一条消息:

CREATE TABLE KafkaSink(
   `user_id` VARCHAR,
   `user_name` VARCHAR,
   `age` INT 
) WITH (
   'connector' = 'kafka',
   'topic' = 'test_sink',
   'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
   'scan.startup.mode' = 'latest-offset',
   'value.format' = 'csv',
   'properties.sasl.kerberos.service.name' = 'kafka',
   'properties.security.protocol' = 'SASL_PLAINTEXT',
   'properties.kerberos.domain.name' = 'hadoop.系统域名' 
);
INSERT INTO KafkaSink (`user_id`, `user_name`, `age`)VALUES ('1', 'John Smith', 35);

Kafka作为source表时必须指定“properties.group.id”配置项

【示例】以“testGroup”为用户组读取主题为“test_sink”的Kafka消息:

CREATE TABLE KafkaSource(
   `user_id` VARCHAR,
   `user_name` VARCHAR,
   `age` INT 
) WITH (
   'connector' = 'kafka',
   'topic' = 'test_sink',
   'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
   'scan.startup.mode' = 'latest-offset',
   'properties.group.id' = 'testGroup',
   'value.format' = 'csv',
   'properties.sasl.kerberos.service.name' = 'kafka',
   'properties.security.protocol' = 'SASL_PLAINTEXT',
   'properties.kerberos.domain.name' = 'hadoop.系统域名' 
);
SELECT * FROM KafkaSource;

不能同时设置“topic-pattern”和“topic”配置项

topic-pattern:主题模式,用于source表,可使用正则表达式的主题名称。

【示例】以下source表将订阅所有以“test-topic-”开头,单个数字结尾的主题消息:

CREATE TABLE payments ( 
    payment_id INT, 
    customer_id INT, 
    payment_date TIMESTAMP(3), 
    payment_amount DECIMAL(10, 2)
) WITH ( 
    'connector' = 'kafka', 
    'topic-pattern' = 'test-topic-[0-9]', 
    'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 
    'format' = 'json'
);
SELECT * FROM payments WHERE payment_amount < 500;
  • Kafka的Broker实例业务IP:服务的实例IP地址可通过登录Manager界面后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
  • Kafka端口号:
    • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
    • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

      登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

  • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

相关文档