Help Center/ MapReduce Service/ Component Operation Guide (LTS) (Ankara Region)/ Using Flink/ Enhancements to Flink SQL/ Consuming Data in drs-json Format with FlinkSQL Kafka Connector
Updated on 2024-11-29 GMT+08:00

Consuming Data in drs-json Format with FlinkSQL Kafka Connector

Scenarios

FlinkSQL needs to consume data in drs-json format (a CDC message format) in Kafka.

How to Use

In the created Kafka Connector Source stream table, set format to drs-json.

The following is a SQL example:

CREATE TABLE KafkaSource (
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'drs-json',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.System domain name'
);
CREATE TABLE printSink(
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'print'
);
Insert into
  printSink
select
  *
from
  KafkaSource;