Real-Time Job Ingestion
Real-time jobs are generally completed using Flink SQL or Spark Streaming. Stream-based real-time jobs are typically configured to synchronously generate compaction plans and asynchronously execute the plans.
- Sink Hudi table configuration in Flink SQL jobs:
create table hudi_sink_table ( // table columns... ) PARTITIONED BY ( years, months, days ) with ( 'connector' = 'hudi', // Specify the Hudi table to be written. 'path' = 'obs://bucket/path/hudi_sink_table', // Specify the storage path for the Hudi table. 'table.type' = 'MERGE_ON_READ', // Hudi table type 'hoodie.datasource.write.recordkey.field' = 'id', // Primary key 'write.precombine.field' = 'vin', // Field for pre-combining 'write.tasks' = '10', // Flink write parallelism 'hoodie.datasource.write.keygenerator.type' = 'COMPLEX', // Specify KeyGenerator, consistent with the type of the Hudi table created by Spark. 'hoodie.datasource.write.hive_style_partitioning' = 'true', // Use a Hive-compatible partitioning format. 'read.streaming.enabled' = 'true', // Enable stream read. 'read.streaming.check-interval' = '60', // Checkpoint interval, in seconds. 'index.type'='BUCKET', // Specify the Hudi table index type as BUCKET. 'hoodie.bucket.index.num.buckets'='10', // Specify the number of buckets. 'compaction.delta_commits' = '3', // Interval for compaction commits 'compaction.async.enabled' = 'false', // Disable asynchronous compaction. 'compaction.schedule.enabled' = 'true', // Enable synchronous compaction scheduling. 'clean.async.enabled' = 'false', // Disable asynchronous cleaning. 'hoodie.archive.automatic' = 'false', // Disable automatic archiving. 'hoodie.clean.automatic' = 'false', // Disable automatic cleaning. 'hive_sync.enable' = 'true', // Enable automatic metadata synchronization. 'hive_sync.mode' = 'jdbc', // Use JDBC for metadata synchronization. 'hive_sync.jdbc_url' = '', // JDBC URL for metadata synchronization 'hive_sync.db' = 'default', // Database for metadata synchronization 'hive_sync.table' = 'hudi_sink_table', // Table name for metadata synchronization 'hive_sync.support_timestamp' = 'true', // Support timestamp format for Hive table synchronization 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' // Extractor class for Hive table synchronization );
- Common parameters for writing Hudi tables with Spark Streaming (The meanings of the parameters are similar to those in Flink, so they will not be annotated again):
hoodie.table.name= hoodie.index.type=BUCKET hoodie.bucket.index.num.buckets=3 hoodie.datasource.write.precombine.field= hoodie.datasource.write.recordkey.field= hoodie.datasource.write.partitionpath.field= hoodie.datasource.write.table.type= MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true hoodie.compact.inline=true hoodie.schedule.compact.only.inline=true hoodie.run.compact.only.inline=false hoodie.clean.automatic=false hoodie.clean.async=false hoodie.archive.async=false hoodie.archive.automatic=false hoodie.compact.inline.max.delta.commits=50 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.database= hoodie.datasource.hive_sync.table= hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.