Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Hudi/ Hudi SQL Syntax Reference/ Hidden Partitioning/ Writing Data into Hidden Partitioned Tables with Spark and Flink
Updated on 2026-04-10 GMT+08:00

Writing Data into Hidden Partitioned Tables with Spark and Flink

  1. Bucket indexes must be used by both Spark and Flink.
  2. The value of hoodie.hidden.partitioning.rule set on Spark must be the same as that on Flink.

Examples

  • Create a table with Spark.
    CREATE TABLE default.flink_stream_mor3 (
      uuid STRING,
      name STRING,
      age INT,
      ts TIMESTAMP,
      ts6 TIMESTAMP,
      p STRING)
    USING hudi
    LOCATION 'hdfs://hacluster/tmp/hudi/flink_stream_mor3'
    TBLPROPERTIES (
      'hoodie.hidden.partitioning.rule' = 'year(ts), date(ts, MM), bucket(age, 3)',
      'hoodie.hidden.partitioning.enabled' = 'true',
      'hoodie.index.type' = 'BUCKET',
      'hoodie.bucket.index.num.buckets' = '2',
      'preCombineField' = 'uuid',
      'primaryKey' = 'uuid',
      'type' = 'mor');
    
  • Write inventory data into the table.
    insert into flink_stream_mor3 select uuid,name,age,ts,ts6,p from tmp_table
  • Write data with Flink streams.
    create table kafka_sink(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp(3),
      ts6 timestamp(6),
      p varchar(20)
    ) with (
      'connector' = 'kafka',
      'topic' = 'input2',
      'properties.bootstrap.servers' = 'XXX',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.HADOOP.COM',
      'format' = 'json'
    );
    create hudi table hudi_source(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp(3),
      ts6 timestamp(6),
      p varchar(20)
    ) with (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/flink_stream_mor3',
      'table.type' = 'MERGE_ON_READ',
      'hoodie.table.name' = 'flink_stream_mor3',
      'hoodie.datasource.write.recordkey.field' = 'uuid',
      'hoodie.datasource.query.type' = 'snapshot',
      'write.precombine.field' = 'uuid',
      'write.tasks' = '4',
      'write.index_bootstrap.tasks' = '4',
      'write.bucket_assign.tasks' = '4',
      'hoodie.datasource.write.hive_style_partitioning' = 'true',
      'read.streaming.enabled' = 'true',
      'read.streaming.check-interval' = '5',
      'compaction.delta_commits' = '3',
      'compaction.async.enabled' = 'false',
      'compaction.schedule.enabled' = 'true',
      'hoodie.hidden.partitioning.enabled' = 'true',
      'hoodie.hidden.partitioning.rule' = 'year(ts), date(ts, MM), bucket(age, 3)',
      'index.type' = 'BUCKET',
      'hoodie.bucket.index.num.buckets' = '2',
      'hive_sync.enable' = 'true',
      'hive_sync.mode' = 'jdbc',
      'hive_sync.jdbc_url' = 'XXXX',
      'hive_sync.table' = 'flink_stream_mor3',
      'hive_sync.metastore.uris' = 'XXXX',
      'hive_sync.support_timestamp' = 'true',
      'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
      'clean.async.enabled' = 'false'
    );
    insert into
      hudi_source
    select
      *
    from
      kafka_sink;
  • Stop writing Flink streams and use Spark SQL to insert data.
    insert into flink_stream_mor3 select uuid,name,age,ts,ts6,p from flink_stream_mor3 where uuid =1;