Updated on 2024-08-30 GMT+08:00

Real-time Task Access

Real-time jobs are usually completed by Flink SQL or SparkStreaming. For real-time streaming tasks, compaction plans are generated synchronously and asynchronously.

  • Configure the Hudi table on the sink end in a Flink SQL job as follows:
    create table denza_hudi_sink (
    $HUDI_SINK_SQL_REPLACEABLE$
    ) PARTITIONED BY (
    years,
    months,
    days
    ) with (
    'connector' = 'hudi', //Specifies that the Hudi table is written.
    'path'='obs://XXXXXXXXXXXXXXXXXX/', //Specify the path for storing the Hudi table.
    'table.type'='MAKEED_ON_READ', //Hudi table type
    'hoodie.datasource.write.recordkey.field' = 'id', //Primary key
    'write.precombine.field'='vin', //Combined field
    'write.tasks' = '10', //Flink write parallelism
    'hoodie.datasource.write.keygenerator.type' = 'COMPLEX', //Specify the KeyGenerator, which is the same as the Hudi table type created by Spark.
    ' hoodie.datasource.write.hive_style_partitioning '='true', //Use the partition format supported by Hive.
    'read.streaming.enabled' = 'true', //Enable stream reading.
    'read.streaming.check-interval'='60', //checkpoint interval, in seconds.
    'index.type' = 'BACKET', //Specify the index type of the Hudi table as BUCKET.
    'hoodie.bucket.index.num.buckets' = '10', //Specify the number of buckets.
    'compaction.delta_commits' = '3', //Interval for the commit file generated by the compaction
    'compaction.async.enabled' = 'false', //Disable the asynchronous execution of the compaction.
    'compaction.schedule.enabled' ='true', //compaction synchronously generates a plan.
    'clean.async.enabled'='false', //Disable asynchronous clean.
    'hoodie.archive.automatic'='false', //Automatic archive disabled
    'hoodie.clean.automatic'='false', //Automatic clean is disabled.
    'hive_sync.enable' = 'true', // Automatically synchronize Hive tables.
    'hive_sync.mode' = 'jdbc', //The Hive table synchronization mode is jdbc.
    'hive_sync.jdbc_url'='', //Jdbc URL for synchronizing Hive tables
    'hive_sync.db' = 'hudi_cars_byd', //Database for synchronizing Hive tables
    'hive_sync.table'='byd_hudi_denza_1s_mor', //Synchronize the table name of the Hive table.
    'hive_sync.metastore.uris' = 'thrift://XXXXX:9083', //Metastore URI for synchronizing Hive tables
    'hive_sync.support_timestamp' = 'true', //The Hive table supports the timestamp format.
    'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' / / Synchronize the extractor class of the Hive table.
    );
  • The following table lists the common parameters used by the Spark Streaming to write data to the Hudi table. (The meaning of the parameter is similar to that of flink and is not commented out.)
    hoodie.table.name=
    hoodie.index.type=BUCKET
    hoodie.bucket.index.num.buckets=3
    hoodie.datasource.write.precombine.field=
    hoodie.datasource.write.recordkey.field=
    hoodie.datasource.write.partitionpath.field=
    hoodie.datasource.write.table.type= MERGE_ON_READ
    hoodie.datasource.write.hive_style_partitioning=true
    hoodie.compact.inline=true
    hoodie.schedule.compact.only.inline=true
    hoodie.run.compact.only.inline=false
    hoodie.clean.automatic=false
    hoodie.clean.async=false
    hoodie.archive.async=false
    hoodie.archive.automatic=false
    hoodie.compact.inline.max.delta.commits=50
    hoodie.datasource.hive_sync.enable=true
    hoodie.datasource.hive_sync.partition_fields=
    hoodie.datasource.hive_sync.database=
    hoodie.datasource.hive_sync.table=
    hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor