Help Center/ CloudTable Service/ Best Practices/ Importing Data/ Using a DLI Flink Job to Synchronize MRS Kafka Data to a CloudTable ClickHouse Cluster in Real Time
Updated on 2025-10-24 GMT+08:00

Using a DLI Flink Job to Synchronize MRS Kafka Data to a CloudTable ClickHouse Cluster in Real Time

This section describes the best practices of real-time data synchronization. You can use DLI Flink jobs to synchronize data generated by MRS Kafka jobs to ClickHouse in real time.

Constraints

  • Kerberos authentication is not enabled for the MRS cluster.
  • To ensure network connectivity, the security group, region, VPC, and subnet of the MRS cluster must be the same as those of the CloudTable cluster.
  • Add the elastic resource network segment subnet associated with the DLI queue to the inbound rules of both the MRS and CloudTable security groups to establish a data source connection. For details, see Creating an Enhanced Datasource Connection.
  • You must enable the network connectivity between the upstream and downstream of DLI. For details, see Testing Address Connectivity.

Preparations

Step 1: Create a CloudTable ClickHouse Cluster

  1. Log in to the CloudTable console and create a ClickHouse cluster
  2. Download the client and verification file
  3. Create an ECS.
  4. Install and verify the client.Install and verify the client.
  5. Create a Flink database.
    create database flink;

    Use the Flink database.

    use flink;
  6. Create the flink.order table.
    create table flink.order(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id;
  7. Check whether the table is successfully created:
    select * from flink.order;

Step 2: Create a Flink Job in the MRS Cluster to Generate Data

  1. Create an MRS cluster
  2. Log in to Manager and choose Cluster > Flink > Dashboard.
  3. Click the link on the right of Flink WebUI to access the Flink web UI.
  4. On the Flink web UI, create a Flink task to generate data.
    1. Click Create Job on the Job Management page. The Create Job page is displayed.
    2. Set parameters and click OK to create a Flink SQL job. To modify a SQL statement, click Develop in the Operation column and add the following command on the SQL page:

      ip:port: IP address and port number

      • To obtain the IP address, log in to Manager of the cluster and choose Cluster > Kafka > Instance. On the displayed page, view the Management IP Address of Broker.
      • To obtain the port number, click Configurations. On the configuration page that is displayed, search for port and obtain the port number (which is the PLAINTEXT protocol port number listened by the Broker service).
      • You are advised to add multiple IP addresses and port numbers to the properties.bootstrap.servers parameter to prevent job running failures caused by unstable network or other reasons.

      SQL statement examples

      CREATE TABLE IF NOT EXISTS `lineorder_ck` (
      `order_id` string,
      `order_channel` string,
      `order_time` string,
      `pay_amount` double,
      `real_pay` double,
      `pay_time` string,
      `user_id` string,
      `user_name` string,
      `area_id` string
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_flink',
      'properties.bootstrap.servers' = 'ip:port',
      'value.format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka'
      );
      CREATE TABLE lineorder_datagen (
      `order_id` string,
      `order_channel` string,
      `order_time` string,
      `pay_amount` double,
      `real_pay` double,
      `pay_time` string,
      `user_id` string,
      `user_name` string,
      `area_id` string
      ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1000'
      );
      INSERT INTO
      lineorder_ck
      SELECT
      *
      FROM
      lineorder_datagen;
    3. Return to the Job Management page and click Start in the Operation column. If the job status is Running, the job is successfully executed.

Step 3: Create a DLI Flink Job to Synchronize Data

  1. Create elastic resource pools and queues. For details, see Creating an Elastic Resource Pool and Creating Queues Within It in the .
  2. Create a data source connection. For details, see Creating an Enhanced Datasource Connection.
  3. Test the connectivity between DLI and the upstream MRS Kafka and between DLI and the downstream CloudTable ClickHouse.
    1. After the elastic resource pool and queue are created, choose Resources > Queue Management. On the displayed page, test the address connectivity. For details, see Testing Address Connectivity.
    2. To obtain the upstream IP address and port number, go to Manager of the cluster and choose Cluster > Kafka > Instance. On the displayed page, view the Management IP Address of Broker. Click Configurations. On the configuration page that is displayed, search for port and obtain the port number (which is the PLAINTEXT protocol port number listened by the Broker service).
    3. To obtain the downstream IP address and port number, go to the Details page.
  4. Create a Flink job. For details, see Submitting a Flink Job Using DLI.
  5. Select the Flink job created in 4, click Edit in the Operation column, and add SQL statements for data synchronization.
    create table orders (
    order_id string,
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'test_flink',
    'properties.bootstrap.servers' = 'ip:port',
    'properties.group.id' = 'testGroup_1',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
    );
    create table clickhouseSink(
    order_id string,
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string
    ) with (
    'connector' = 'clickhouse',
    'url' = 'jdbc:clickhouse://ip:port/flink',
    'username' = 'admin',
    'password' = '****',
    'table-name' = 'order',
    'sink.buffer-flush.max-rows' = '10',
    'sink.buffer-flush.interval' = '3s'
    );
    insert into clickhouseSink select * from orders;
  6. Click Format and click Save.

    Click Format to format the SQL code. Otherwise, new null characters may be introduced during code copy and paste, causing job execution failures.

  7. On the DLI management console, choose Job Management > Flink Jobs.
  8. Click Start in the Operation column to start the job created in 4. If the job status is Running, the job is successfully executed.

Step 4: Verify the result.

  1. After the MRS Flink and DLI jobs are successfully executed, return to the command execution window of the ClickHouse cluster.
  2. View databases.
    show databases;
  3. Use databases.
    use databases;
  4. View tables.
    show tables;
  5. View the synchronized data.
    select * from order limit 10;
    Figure 2 Viewing synchronization data