更新时间:2024-12-25 GMT+08:00
分享

实时任务接入

实时作业一般由Flink Sql或Sparkstreaming来完成,流式实时任务通常配置同步生成compaction计划,异步执行计划。

  • Flink SQL作业中sink端Hudi表相关配置如下:
    create table hudi_sink_table (
      // table columns... 
    ) PARTITIONED BY (
      years,
      months,
      days
    ) with (
      'connector' = 'hudi',                                                      //指定写入的是Hudi表
      'path' = 'obs://bucket/path/hudi_sink_table',                              //指定Hudi表的存储路径
      'table.type' = 'MERGE_ON_READ',                                            //Hudi表类型
      'hoodie.datasource.write.recordkey.field' = 'id',                          //主键
      'write.precombine.field' = 'vin',                                          //合并字段
      'write.tasks' = '10',                                                      //flink写入并行度
      'hoodie.datasource.write.keygenerator.type' = 'COMPLEX',                   //指定KeyGenerator,与Spark创建的Hudi表类型一致
      'hoodie.datasource.write.hive_style_partitioning' = 'true',                //使用hive支持的分区格式
      'read.streaming.enabled' = 'true',                                         //开启流读
      'read.streaming.check-interval' = '60',                                    //checkpoint间隔,单位为秒
      'index.type'='BUCKET',                                            //指定Hudi表索引类型为BUCKET
      'hoodie.bucket.index.num.buckets'='10',                            //指定bucket桶数
      'compaction.delta_commits' = '3',                                          //compaction生成的commit间隔
      'compaction.async.enabled' = 'false',                                      //compaction异步执行关闭
      'compaction.schedule.enabled' = 'true',                                    //compaction同步生成计划
      'clean.async.enabled' = 'false',                                           //异步clean关闭
      'hoodie.archive.automatic' = 'false',                                     //自动archive关闭
      'hoodie.clean.automatic' = 'false',                                        //自动clean关闭
      'hive_sync.enable' = 'true',                                               //自动同步元数据
      'hive_sync.mode' = 'jdbc',                                                 //同步元数据方式为jdbc
      'hive_sync.jdbc_url' = '',                                                 //同步元数据的jdbc url
      'hive_sync.db' = 'default',                                          //同步元数据的database
      'hive_sync.table' = 'hudi_sink_table',                               //同步元数据的tablename
      'hive_sync.support_timestamp' = 'true',                                    //同步hive表支持timestamp格式
      'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'           //同步hive表的extractor类
    );
  • Spark streaming写入Hudi表常用的参数如下(参数意义与上面flink类似,不再做注释):
    hoodie.table.name=
    hoodie.index.type=BUCKET
    hoodie.bucket.index.num.buckets=3
    hoodie.datasource.write.precombine.field=
    hoodie.datasource.write.recordkey.field=
    hoodie.datasource.write.partitionpath.field=
    hoodie.datasource.write.table.type= MERGE_ON_READ
    hoodie.datasource.write.hive_style_partitioning=true
    hoodie.compact.inline=true
    hoodie.schedule.compact.only.inline=true
    hoodie.run.compact.only.inline=false
    hoodie.clean.automatic=false
    hoodie.clean.async=false
    hoodie.archive.async=false
    hoodie.archive.automatic=false
    hoodie.compact.inline.max.delta.commits=50
    hoodie.datasource.hive_sync.enable=true
    hoodie.datasource.hive_sync.partition_fields=
    hoodie.datasource.hive_sync.database=
    hoodie.datasource.hive_sync.table=
    hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor

相关文档