Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Enterprise-Class Enhancements of Flink/ Consuming Data in drs-json Format with Flink SQL Kafka Connector
Updated on 2025-08-22 GMT+08:00

Consuming Data in drs-json Format with Flink SQL Kafka Connector

This section applies to MRS 3.3.0 or later.

Scenarios

Flink SQL needs to consume data in drs-json format (a CDC message format) in Kafka.

How to Use

In the created Kafka connector source stream table, set format to drs-json to specify the data format.

The following is a SQL example:

CREATE TABLE KafkaSource (
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'drs-json',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.System domain name'
);
CREATE TABLE printSink(
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'print'
);
Insert into
  printSink
select
  *
from
  KafkaSource;
  • The IP address and port number of the Kafka Broker instance are as follows:
    • To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
    • If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
    • If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the Broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

      Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.

  • System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.