Updated on 2025-04-21 GMT+08:00

Real-Time Job Ingestion

Real-time jobs are generally completed using Flink SQL or Spark Streaming. Stream-based real-time jobs are typically configured to synchronously generate compaction plans and asynchronously execute the plans.

  • Sink Hudi table configuration in Flink SQL jobs:
    create table hudi_sink_table (
      // table columns... 
    ) PARTITIONED BY (
      years,
      months,
      days
    ) with (
      'connector' = 'hudi',                                                      // Specify the Hudi table to be written.
      'path' = 'obs://bucket/path/hudi_sink_table',                              // Specify the storage path for the Hudi table.
      'table.type' = 'MERGE_ON_READ',                                            // Hudi table type
      'hoodie.datasource.write.recordkey.field' = 'id',                          // Primary key
      'write.precombine.field' = 'vin',                                          // Field for pre-combining
      'write.tasks' = '10',                                                      // Flink write parallelism
      'hoodie.datasource.write.keygenerator.type' = 'COMPLEX',                   // Specify KeyGenerator, consistent with the type of the Hudi table created by Spark.
      'hoodie.datasource.write.hive_style_partitioning' = 'true',                // Use a Hive-compatible partitioning format.
      'read.streaming.enabled' = 'true',                                         // Enable stream read.
      'read.streaming.check-interval' = '60',                                    // Checkpoint interval, in seconds.
      'index.type'='BUCKET',                                            // Specify the Hudi table index type as BUCKET.
      'hoodie.bucket.index.num.buckets'='10',                            // Specify the number of buckets.
      'compaction.delta_commits' = '3',                                          // Interval for compaction commits
      'compaction.async.enabled' = 'false',                                      // Disable asynchronous compaction.
      'compaction.schedule.enabled' = 'true',                                    // Enable synchronous compaction scheduling.
      'clean.async.enabled' = 'false',                                           // Disable asynchronous cleaning.
      'hoodie.archive.automatic' = 'false',                                     // Disable automatic archiving.
      'hoodie.clean.automatic' = 'false',                                        // Disable automatic cleaning.
      'hive_sync.enable' = 'true',                                               // Enable automatic metadata synchronization.
      'hive_sync.mode' = 'jdbc',                                                 // Use JDBC for metadata synchronization.
      'hive_sync.jdbc_url' = '',                                                 // JDBC URL for metadata synchronization
      'hive_sync.db' = 'default',                                          // Database for metadata synchronization
      'hive_sync.table' = 'hudi_sink_table',                               // Table name for metadata synchronization
      'hive_sync.support_timestamp' = 'true',                                    // Support timestamp format for Hive table synchronization
      'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'           // Extractor class for Hive table synchronization
    );
  • Common parameters for writing Hudi tables with Spark Streaming (The meanings of the parameters are similar to those in Flink, so they will not be annotated again):
    hoodie.table.name=
    hoodie.index.type=BUCKET
    hoodie.bucket.index.num.buckets=3
    hoodie.datasource.write.precombine.field=
    hoodie.datasource.write.recordkey.field=
    hoodie.datasource.write.partitionpath.field=
    hoodie.datasource.write.table.type= MERGE_ON_READ
    hoodie.datasource.write.hive_style_partitioning=true
    hoodie.compact.inline=true
    hoodie.schedule.compact.only.inline=true
    hoodie.run.compact.only.inline=false
    hoodie.clean.automatic=false
    hoodie.clean.async=false
    hoodie.archive.async=false
    hoodie.archive.automatic=false
    hoodie.compact.inline.max.delta.commits=50
    hoodie.datasource.hive_sync.enable=true
    hoodie.datasource.hive_sync.partition_fields=
    hoodie.datasource.hive_sync.database=
    hoodie.datasource.hive_sync.table=
    hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor