Updated on 2024-05-29 GMT+08:00

Interconnecting FlinkServer with Hudi

This section applies to MRS 3.1.2 or later clusters.


This section describes how to interconnect FlinkServer with Hudi through Flink SQL jobs.


  • The HDFS, Yarn, Flink, and Hudi services have been installed in a cluster.
  • The client that contains the Hudi service has been installed in a directory, for example, /opt/client.
  • Flink 1.12.2 or later and Hudi 0.9.0 or later are required.
  • You have created a user assigned with the FlinkServer Admin Privilege (for example, flink_admin) for accessing the Flink web UI by referring to Creating a FlinkServer Role.

Flink Support for Read and Write Operations on Hudi Tables

Table 1 lists the read and write operations supported by Flink on Hudi COW and MOR tables.

Table 1 Flink support for read and write operations on Hudi tables

Flink SQL

COW table

MOR table

Batch write



Batch read



Stream write



Stream read




  1. Log in to Manager as user flink_admin and choose Cluster > Services > Flink. In the Basic Information area, click the link on the right of Flink WebUI to access the Flink web UI.
  2. Create a Flink SQL job by referring to Creating a Job. On the job development page, configure the job parameters as follows and start the job.

    In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 60000, and retain the default value for Mode.

    • CheckPoint should be enabled on the Flink web UI because data is written to a Hudi table only when a Flink SQL job triggers CheckPoint. Adjust the CheckPoint interval based on service requirements. You are advised to set the interval to a large number.
    • If the CheckPoint interval is too short, job exceptions may occur due to untimely data updates. It is recommended that the CheckPoint interval be configured at the minute level.
    • Asynchronous compaction is required when a Flink SQL job writes an MOR table. For details about the parameter for controlling the compaction interval, visit Hudi official website https://hudi.apache.org/docs/configurations.html.
    • The following shows a Flink SQL job writing data to an MOR table in stream mode.
      CREATE TABLE stream_mor(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
      'table.type' = 'MERGE_ON_READ'
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode. Delete the comma (,) in the previous line.
      'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode.
      'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode.
      insert into
    • The following shows a Flink SQL job writing data to a COW table in stream mode:
      CREATE TABLE stream_write_cow(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_cow'
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode. Delete the comma (,) in the previous line.
      'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode.
      'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode.
      insert into
    • The following shows a Flink SQL job reading an MOR table.
      CREATE TABLE hudi_read_spark_mor(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',  
      'path' = 'hdfs://hacluster/tmp/default/tb_hudimor',  
      'table.type' = 'MERGE_ON_READ'
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts timestamp(6)INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode. Delete the comma (,) in the previous line.
      'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode.
      'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode.
      insert into

    Kafka port number

    • Value of sasl.port when Authentication Mode of the cluster is Security Mode, 21007 by default.
    • Value of port when Authentication Mode of the cluster is Normal Mode, 9092 by default. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

      Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the page that is displayed, click the Configurations tab then the All Configurations sub-tab. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.

  3. After data is written to the Hudi table by a Flink SQL job and is read by Spark and Hive, use run_hive_sync_tool.sh to synchronize the data in the Hudi table to Hive. For details about the synchronization method, see Synchronizing Hudi Table Data to Hive.

    Ensure that no partitions are added before the synchronization. After the synchronization, new partitions cannot be read.

Synchronizing Metadata from Flink On Hudi to Hive

This section applies to MRS 3.2.0 or later.
  • Synchronizing metadata to Hive in JDBC mode
    CREATE TABLE stream_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts INT,
    `p` VARCHAR(20)
    'connector' = 'hudi',
    'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
    'table.type' = 'MERGE_ON_READ',
    'hive_sync.enable' = 'true',
    'hive_sync.table' = 'Name of the table to be synchronized to Hive',
    'hive_sync.db' = 'Name of the database to be synchronized to Hive',
    'hive_sync.metastore.uris' = 'Value of hive.metastore.uris in the hive-site.xml file on the Hive client',
    'hive_sync.jdbc_url' = 'Value of CLIENT_HIVE_URI in the component_env file on the Hive client'
    • hive_sync.jdbc_url: If the value of CLIENT_HIVE_URI contains \, delete \.
    • To use the Hive style partitioning, add the following parameters:
      • 'hoodie.datasource.write.hive_style_partitioning' = 'true'
      • 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
    • Flink on Hudi synchronizes data to Hive. Hudi is case sensitive, while Hive is case insensitive. You are not advised to use uppercase letters in fields of Hudi tables. Otherwise, data may fail to be read or written.
  • Synchronizing metadata to Hive in HMS mode
    CREATE TABLE stream_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts INT,
    `p` VARCHAR(20)
    'connector' = 'hudi',
    'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
    'table.type' = 'MERGE_ON_READ',
    'hive_sync.enable' = 'true',
    'hive_sync.table' = 'Name of the table to be synchronized to Hive',
    'hive_sync.db' = 'Name of the database to be synchronized to Hive',
    'hive_sync.mode' = 'hms',
    'hive_sync.metastore.uris' = 'Value of hive.metastore.uris in the hive-site.xml file on the Hive client',
    'properties.hive.metastore.kerberos.principal' = 'Value of hive.metastore.kerberos.principal in the hive-site.xml file on the Hive client'