文档首页/
MapReduce服务 MRS/
组件操作指南(LTS版)/
使用Flink/
Flink企业级能力增强/
FlinkSQL Kafka Connector支持消费drs-json格式数据
更新时间:2024-12-11 GMT+08:00
FlinkSQL Kafka Connector支持消费drs-json格式数据
本章节适用于MRS 3.3.0及以后版本。
使用场景
FlinkSQL需要消费Kafka中drs-json格式(一种CDC消息格式)的数据。
使用方法
在创建的Kafka Connector Source流表中,设置 'format' = 'drs-json'。
SQL示例如下:
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'drs-json', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE printSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'print' ); Insert into printSink select * from KafkaSource;
父主题: Flink企业级能力增强