更新时间:2024-07-24 GMT+08:00

流式写入Hudi表

HoodieDeltaStreamer流式写入

Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能:

  • 支持Kafka,DFS多种数据源接入 。
  • 支持管理检查点、回滚和恢复,保证exactly once语义。
  • 支持自定义转换操作。

示例:

准备配置文件kafka-source.properties

#hudi配置
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=age
hoodie.upsert.shuffle.parallelism=100
#hive config
hoodie.datasource.hive_sync.table=hudimor_deltastreamer_partition
hoodie.datasource.hive_sync.partition_fields=age
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.datasource.hive_sync.support_timestamp=true
# Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition
#checkpoint
hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition
# Kafka props
# The kafka cluster we want to ingest from
bootstrap.servers= xx.xx.xx.xx:xx
auto.offset.reset=earliest
#auto.offset.reset=latest
group.id=hoodie-delta-streamer
offset.rang.limit=10000

指定HoodieDeltaStreamer执行参数(具体参数配置,请查看官网https://hudi.apache.org/ )执行如下命令:

spark-submit --master yarn

--jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径

--driver-memory 1g

--executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m

--driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径

--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal

--props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。

--target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径

--table-type MERGE_ON_READ // 指定要写入的hudi表类型

--target-table hudimor_deltastreamer_partition // 指定hudi表名

--source-ordering-field name // 指定hudi表预合并列

--source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源为JsonKafkaSource, 该参数根据不同数据源指定不同的source类

--schemaprovider-class com.huaweixxx.bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema

--transformer-class com.huaweixxx.bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制

--enable-hive-sync // 开启hive同步,同步hudi表到hive

--continuous // 指定流处理模式为连续模式

HoodieMultiTableDeltaStreamer流式写入

HoodieMultiTableDeltaStreamer流式写入仅适用于MRS 3.2.0及之后版本。

HoodieDeltaStreamer支持从多种类型的源表抓取数据写入Hudi目标表,但是HoodieDeltaStreamer只能完成一个源表更新一个目标表。而HoodieMultiTableDeltaStreamer可以完成多个源表更新多个目标表,也可以完成多个源表更新一个目标表。

  • 多个源表写一个目标表(两个kafka source写一个Hudi表):

    主要配置:

    // 指定目标表
    hoodie.deltastreamer.ingestion.tablesToBeIngested=目录名.目标表
    // 指定所有的源表给特定目标表
    hoodie.deltastreamer.source.sourcesBoundTo.目标表=目录名.源表1,目录名.源表2
    // 指定每个源表的配置文件路径
    hoodie.deltastreamer.source.目录名.源表1.configFile=路径1
    hoodie.deltastreamer.source.目录名.源表2.configFile=路径2
    // 指定每个源表的恢复点,source类型不同,恢复点的格式也不同。如kafka soruce格式为"topic名,分区名:offset"
    hoodie.deltastreamer.current.source.checkpoint=topic名,分区名:offset
    // 指定每个源表的关联表(hudi表),如果有多个用逗号隔开
    hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/.....,hdfs://hacluster/.....
    // 指定每个源表的数据在写入hudi前的transform操作,注意需要明确列出需要写入的列,不要使用select *
    // <SRC>代表当前source表,不要替换,这是固定写法
    hoodie.deltastreamer.transformer.sql=select field1,field2,field3,... from <SRC>

    Spark提交命令:

    spark-submit \
    --master yarn \
    --driver-memory 1g \
    --executor-memory 1g \
    --executor-cores 1 \
    --num-executors 5 \
    --conf spark.driver.extraClassPath=/opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/* \
    --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer /opt/client/Hudi/hudi/lib/hudi-utilities_2.12-*.jar \
    --props file:///opt/hudi/testconf/sourceCommon.properties \
    --config-folder file:///opt/hudi/testconf/ \
    --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
    --schemaprovider-class org.apache.hudi.examples.common.HoodieMultiTableDeltaStreamerSchemaProvider \
    --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
    --source-ordering-field col6 \
    --base-path-prefix hdfs://hacluster/tmp/ \
    --table-type COPY_ON_WRITE \
    --target-table KafkaToHudi \
    --enable-hive-sync \
    --allow-fetch-from-multiple-sources \
    --allow-continuous-when-multiple-sources
    1. 当“source”的类型是“kafka source”时,“--schemaprovider-class”指定的schema provider类需要用户自己开发。
    2. “--allow-fetch-from-multiple-sources”表示开启多源表写入。
    3. “--allow-continuous-when-multiple-sources”表示开启多源表持续写入,如果未设置所有源表写入一次后任务就会结束。

    sourceCommon.properties :

    hoodie.deltastreamer.ingestion.tablesToBeIngested=testdb.KafkaToHudi
    hoodie.deltastreamer.source.sourcesBoundTo.KafkaToHudi=source1,source2
    hoodie.deltastreamer.source.default.source1.configFile=file:///opt/hudi/testconf/source1.properties
    hoodie.deltastreamer.source.default.source2.configFile=file:///opt/hudi/testconf/source2.properties
    
    hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
    hoodie.datasource.write.partitionpath.field=col0
    hoodie.datasource.write.recordkey.field=primary_key
    hoodie.datasource.write.precombine.field=col6
    
    hoodie.datasource.hive_sync.table=kafkatohudisync
    hoodie.datasource.hive_sync.partition_fields=col0
    hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
    
    bootstrap.servers=192.168.34.221:21005,192.168.34.136:21005,192.168.34.175:21005
    auto.offset.reset=latest
    group.id=hoodie-test

    source1.properties:

    hoodie.deltastreamer.current.source.name=source1 // kafka topic的名称有时候可读性很差,所以这里给它取个别名当作source的名称
    hoodie.deltastreamer.source.kafka.topic=s1
    hoodie.deltastreamer.current.source.checkpoint=s1,0:0,1:0 // 任务启动时,该source的恢复点(从0分区的0 offset,1分区的0 offset开始恢复)
    // 指定与source1表进行join的hudi表,如果该hudi表已经同步到hive,则不需要该配置,直接在sql中通过表名来使用
    hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/tmp/huditest/tb_test_cow_par
    // <SRC>代表当前的source表,即source1,固定写法
    hoodie.deltastreamer.transformer.sql=select A.primary_key, A.col0, B.col1, B.col2, A.col3, A.col4, B.col5, B.col6, B.col7 from <SRC> as A join tb_test_cow_par as B on A.primary_key = B.primary_key

    source2.properties

    hoodie.deltastreamer.current.source.name=source2
    hoodie.deltastreamer.source.kafka.topic=s2
    hoodie.deltastreamer.current.source.checkpoint=s2,0:0,1:0
    hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/tmp/huditest/tb_test_cow_par
    hoodie.deltastreamer.transformer.sql=select A.primary_key, A.col0, B.col1, B.col2, A.col3, A.col4, B.col5, B.col6, B.col7 from <SRC> as A join tb_test_cow_par as B on A.primary_key = B.primary_key
  • 多个源表写一个目标表(两个Hudi表source写一个Hudi表):

    Spark提交命令:

    spark-submit \
    --master yarn \
    --driver-memory 1g \
    --executor-memory 1g \
    --executor-cores 1 \
    --num-executors 2 \
    --conf spark.driver.extraClassPath=/opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/* \
    --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer /opt/client/Hudi/hudi/lib/hudi-utilities_2.12-*.jar \
    --props file:///opt/testconf/sourceCommon.properties \
    --config-folder file:///opt/testconf/ \
    --source-class org.apache.hudi.utilities.sources.HoodieIncrSource \ //指定source的类型是Hudi表,作为源表的Hudi表只能是COW类型
    --payload-class org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload \ //指定一个payload, payload决定了新值更新旧值的方式。
    --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ //指定一个transformer类,源表schema和目标表的schema不一致时,源表的数据需要进行transform才能写入目标表。
    --source-ordering-field col6 \
    --base-path-prefix hdfs://hacluster/tmp/ \ //目标表的存放路径
    --table-type MERGE_ON_READ \ //目标表的类型,可以是COW表也可以是MOR表。
    --target-table tb_test_mor_par_300 \ //指定目标表的表名,多源表更新单表时,目标表的表名必须给出。
    --checkpoint 000 \ //指定一个检查点(commit时间戳),表明从此检查点恢复Delta Streamer,000代表从头开始。
    --enable-hive-sync \
    --allow-fetch-from-multiple-sources \
    --allow-continuous-when-multiple-sources \
    --op UPSERT //指定写操作类型
    • 当“source”的类型是“HoodieIncrSourc”时,不需要指定“--schemaprovider-class”。
    • “--transformer-class”指定SqlQueryBasedTransformer,可以通过SQL来操作数据转换,将源数据结构转换成目标表数据结构。

    file:///opt/testconf/sourceCommon.properties:

    # source的公共属性
    hoodie.deltastreamer.ingestion.tablesToBeIngested=testdb.tb_test_mor_par_300 //指定一个目标表。多源表写单目标表,所以目标表可以作为公共属性。
    hoodie.deltastreamer.source.sourcesBoundTo.tb_test_mor_par_300=testdb.tb_test_mor_par_100,testdb.tb_test_mor_par_200 //指定多个源表。
    hoodie.deltastreamer.source.testdb.tb_test_mor_par_100.configFile=file:///opt/testconf/tb_test_mor_par_100.properties //源表tb_test_mor_par_100的属性文件路径
    hoodie.deltastreamer.source.testdb.tb_test_mor_par_200.configFile=file:///opt/testconf/tb_test_mor_par_200.properties //源表tb_test_mor_par_200的属性文件路径
    
    # 所有source公用的hudi写配置,source独立的配置需要写到自己对应的属性文件中
    hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
    hoodie.datasource.write.partitionpath.field=col0
    hoodie.datasource.write.recordkey.field=primary_key
    hoodie.datasource.write.precombine.field=col6

    file:///opt/testconf/tb_test_mor_par_100.properties

    # 源表tb_test_mor_par_100的配置
    hoodie.deltastreamer.source.hoodieincr.path=hdfs://hacluster/tmp/testdb/tb_test_mor_par_100 //源表的路径
    hoodie.deltastreamer.source.hoodieincr.partition.fields=col0 //源表的分区键
    hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=false
    hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/tmp/testdb/tb_test_mor_par_400 //指定与源表进行关联操作的表
    hoodie.deltastreamer.transformer.sql=select A.primary_key, A.col0, B.col1, B.col2, A.col3, A.col4, B.col5, A.col6, B.col7 from <SRC> as A join tb_test_mor_par_400 as B on A.primary_key = B.primary_key //该配置在transformer类指定为SqlQueryBasedTransformer才会生效
    file:///opt/testconf/tb_test_mor_par_200.properties
    # 源表tb_test_mor_par_200的配置
    hoodie.deltastreamer.source.hoodieincr.path=hdfs://hacluster/tmp/testdb/tb_test_mor_par_200
    hoodie.deltastreamer.source.hoodieincr.partition.fields=col0
    hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=false
    hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/tmp/testdb/tb_test_mor_par_400
    hoodie.deltastreamer.transformer.sql=select A.primary_key, A.col0, B.col1, B.col2, A.col3, A.col4, B.col5, A.col6, B.col7 from <SRC> as A join tb_test_mor_par_400 as B on A.primary_key = B.primary_key //源表数据结构转换为目标表的数据结构。该源表如果需要和Hive进行关联操作,可以直接在SQL中通过表名来进行关联操作;该源表如果需要和Hudi表关联操作,需要先指定Hudi表的路径,然后在SQL中通过表名来进行关联操作。