更新时间:2024-11-21 GMT+08:00
FlinkSQL Kafka表开发规则
Kafka作为sink表时必须指定“topic”配置项
【示例】向Kafka的“test_sink”主题插入一条消息:
CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); INSERT INTO KafkaSink (`user_id`, `user_name`, `age`)VALUES ('1', 'John Smith', 35);
Kafka作为source表时必须指定“properties.group.id”配置项
【示例】以“testGroup”为用户组读取主题为“test_sink”的Kafka消息:
CREATE TABLE KafkaSource( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'testGroup', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); SELECT * FROM KafkaSource;
不能同时设置“topic-pattern”和“topic”配置项
topic-pattern:主题模式,用于source表,可使用正则表达式的主题名称。
【示例】以下source表将订阅所有以“test-topic-”开头,单个数字结尾的主题消息:
CREATE TABLE payments ( payment_id INT, customer_id INT, payment_date TIMESTAMP(3), payment_amount DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic-pattern' = 'test-topic-[0-9]', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json' ); SELECT * FROM payments WHERE payment_amount < 500;