Help Center/
MapReduce Service/
Component Development Specifications/
Hudi/
Bucket Tuning/
Real-Time Job Ingestion
Updated on 2025-04-15 GMT+08:00
Real-Time Job Ingestion
Real-time jobs are generally completed using Flink SQL or Spark Streaming. Stream-based real-time jobs are typically configured to synchronously generate compaction plans and asynchronously execute the plans.
- Sink Hudi table configuration in Flink SQL jobs:
create table denza_hudi_sink ( $HUDI_SINK_SQL_REPLACEABLE$ ) PARTITIONED BY ( years, months, days ) with ( 'connector' = 'hudi', --Specify the Hudi table to be written. 'path' = 'obs://XXXXXXXXXXXXXXXXXX/', --Specify the storage path for the Hudi table. 'table.type' = 'MERGE_ON_READ', --Hudi table type 'hoodie.datasource.write.recordkey.field' = 'id', --Primary key 'write.precombine.field' = 'vin', -- Field for pre-combining 'write.tasks' = '10', -- Flink write parallelism 'hoodie.datasource.write.keygenerator.type' = 'COMPLEX', -- Specify KeyGenerator, consistent with the type of the Hudi table created by Spark. 'hoodie.datasource.write.hive_style_partitioning' = 'true', -- Use a Hive-compatible partitioning format. 'read.streaming.enabled' = 'true', -- Enable stream read. 'read.streaming.check-interval' = '60', -- Checkpoint interval, in seconds. 'index.type'='BUCKET', --Specify the Hudi table index type as BUCKET. 'hoodie.bucket.index.num.buckets'='10', -- Specify the number of buckets. 'compaction.delta_commits' = '3', -- Interval for compaction commits 'compaction.async.enabled' = 'false', --Disable asynchronous compaction. 'compaction.schedule.enabled' = 'true', -- Enable synchronous compaction scheduling. 'clean.async.enabled' = 'false', -- Disable asynchronous cleaning. 'hoodie.archive.automatic' = 'false', -- Disable automatic archiving. 'hoodie.clean.automatic' = 'false', -- Disable automatic cleaning. 'hive_sync.enable' = 'true', --Automatically synchronize Hive tables. 'hive_sync.mode' = 'jdbc', --Synchronize Hive tables with JDBC. 'hive_sync.jdbc_url' = '', --Synchronize Hive table's JDBC URL. 'hive_sync.db' = 'hudi_cars_byd', --Synchronize the Hive table's database. 'hive_sync.table' = 'byd_hudi_denza_1s_mor', --Synchronize the Hive table name. 'hive_sync.metastore.uris' = 'thrift://XXXXX:9083 ', -- Synchronize Hive tables' Metastore URI 'hive_sync.support_timestamp' = 'true', -- Support timestamp format for Hive table synchronization 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' -- Extractor class for Hive table synchronization );
- Common parameters for writing Hudi tables with Spark Streaming (The meanings of the parameters are similar to those in Flink, so they will not be annotated again):
hoodie.table.name= hoodie.index.type=BUCKET hoodie.bucket.index.num.buckets=3 hoodie.datasource.write.precombine.field= hoodie.datasource.write.recordkey.field= hoodie.datasource.write.partitionpath.field= hoodie.datasource.write.table.type= MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true hoodie.compact.inline=true hoodie.schedule.compact.only.inline=true hoodie.run.compact.only.inline=false hoodie.clean.automatic=false hoodie.clean.async=false hoodie.archive.async=false hoodie.archive.automatic=false hoodie.compact.inline.max.delta.commits=50 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.database= hoodie.datasource.hive_sync.table= hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
Parent topic: Bucket Tuning
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot