文档首页/ MapReduce服务 MRS/ 组件操作指南(LTS版)/ 使用Flink/ Flink企业级能力增强/ FlinkSQL Kafka Connector支持消费drs-json格式数据
更新时间:2024-12-11 GMT+08:00

FlinkSQL Kafka Connector支持消费drs-json格式数据

本章节适用于MRS 3.3.0及以后版本。

使用场景

FlinkSQL需要消费Kafka中drs-json格式(一种CDC消息格式)的数据。

使用方法

在创建的Kafka Connector Source流表中,设置 'format' = 'drs-json'。

SQL示例如下:

CREATE TABLE KafkaSource (
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'drs-json',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.系统域名'
);
CREATE TABLE printSink(
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'print'
);
Insert into
  printSink
select
  *
from
  KafkaSource;