更新时间:2024-05-20 GMT+08:00
分享

实时任务接入

实时作业一般由Flink Sql或Sparkstreaming来完成,流式实时任务通常配置同步生成compaction计划,异步执行计划。

  • Flink SQL作业中sink端Hudi表相关配置如下:
    create table denza_hudi_sink (
    $HUDI_SINK_SQL_REPLACEABLE$
    ) PARTITIONED BY (
    years,
    months,
    days
    ) with (
    'connector' = 'hudi',                                                      //指定写入的是Hudi表
    'path' = 'obs://XXXXXXXXXXXXXXXXXX/',                                      //指定Hudi表的存储路径
    'table.type' = 'MERGE_ON_READ',                                            //Hudi表类型
    'hoodie.datasource.write.recordkey.field' = 'id',                          //主键
    'write.precombine.field' = 'vin',                                          //合并字段
    'write.tasks' = '10',                                                      //flink写入并行度
    'hoodie.datasource.write.keygenerator.type' = 'COMPLEX',                   //指定KeyGenerator,与Spark创建的Hudi表类型一致
    'hoodie.datasource.write.hive_style_partitioning' = 'true',                //使用hive支持的分区格式
    'read.streaming.enabled' = 'true',                                         //开启流读
    'read.streaming.check-interval' = '60',                                    //checkpoint间隔,单位为秒
    'index.type'='BUCKET',                                            //指定Hudi表索引类型为BUCKET
    'hoodie.bucket.index.num.buckets'='10',                            //指定bucket桶数
    'compaction.delta_commits' = '3',                                          //compaction生成的commit间隔
    'compaction.async.enabled' = 'false',                                      //compaction异步执行关闭
    'compaction.schedule.enabled' = 'true',                                    //compaction同步生成计划
    'clean.async.enabled' = 'false',                                           //异步clean关闭
    'hoodie.archive.automatic' = 'false',                                     //自动archive关闭
    'hoodie.clean.automatic' = 'false',                                        //自动clean关闭
    'hive_sync.enable' = 'true',                                               //自动同步hive表
    'hive_sync.mode' = 'jdbc',                                                 //同步hive表方式为jdbc
    'hive_sync.jdbc_url' = '',                                                 //同步hive表的jdbc url
    'hive_sync.db' = 'hudi_cars_byd',                                          //同步hive表的database
    'hive_sync.table' = 'byd_hudi_denza_1s_mor',                               //同步hive表的tablename
    'hive_sync.metastore.uris' = 'thrift://XXXXX:9083 ',                       //同步hive表的metastore uri
    'hive_sync.support_timestamp' = 'true',                                    //同步hive表支持timestamp格式
    'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'           //同步hive表的extractor类
    );
  • Spark streaming写入Hudi表常用的参数如下(参数意义与上面flink类似,不再做注释):
    hoodie.table.name=
    hoodie.index.type=BUCKET
    hoodie.bucket.index.num.buckets=3
    hoodie.datasource.write.precombine.field=
    hoodie.datasource.write.recordkey.field=
    hoodie.datasource.write.partitionpath.field=
    hoodie.datasource.write.table.type= MERGE_ON_READ
    hoodie.datasource.write.hive_style_partitioning=true
    hoodie.compact.inline=true
    hoodie.schedule.compact.only.inline=true
    hoodie.run.compact.only.inline=false
    hoodie.clean.automatic=false
    hoodie.clean.async=false
    hoodie.archive.async=false
    hoodie.archive.automatic=false
    hoodie.compact.inline.max.delta.commits=50
    hoodie.datasource.hive_sync.enable=true
    hoodie.datasource.hive_sync.partition_fields=
    hoodie.datasource.hive_sync.database=
    hoodie.datasource.hive_sync.table=
    hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
分享:

    相关文档

    相关产品