更新时间:2024-07-19 GMT+08:00

流式写入

HoodieDeltaStreamer流式写入

Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能:

  • 支持Kafka,DFS多种数据源接入 。
  • 支持管理检查点、回滚和恢复,保证exactly once语义。
  • 支持自定义转换操作。

示例:

准备配置文件kafka-source.properties

#hudi配置
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=age
hoodie.upsert.shuffle.parallelism=100
#hive config
hoodie.datasource.hive_sync.table=hudimor_deltastreamer_partition
hoodie.datasource.hive_sync.partition_fields=age
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.datasource.hive_sync.support_timestamp=true
# Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition
#checkpoint
hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition
# Kafka props
# The kafka cluster we want to ingest from
bootstrap.servers= xx.xx.xx.xx:xx
auto.offset.reset=earliest
#auto.offset.reset=latest
group.id=hoodie-delta-streamer
offset.rang.limit=10000

指定HoodieDeltaStreamer执行参数执行如下命令:

spark-submit --master yarn

--jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径

--driver-memory 1g

--executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m

--driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径

--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal

--props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。

--target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径

--table-type MERGE_ON_READ // 指定要写入的hudi表类型

--target-table hudimor_deltastreamer_partition // 指定hudi表名

--source-ordering-field name // 指定hudi表预合并列

--source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源 为JsonKafkaSource, 该参数根据不同数据源指定不同的source类

--schemaprovider-class com..bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema

--transformer-class com..bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制

--enable-hive-sync // 开启hive同步,同步hudi表到hive

--continuous // 指定流处理模式为连续模式