流式写入Hudi表
HoodieDeltaStreamer流式写入
Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能:
- 支持Kafka,DFS多种数据源接入 。
- 支持管理检查点、回滚和恢复,保证exactly once语义。
- 支持自定义转换操作。
示例:
准备配置文件kafka-source.properties
#hudi配置 hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=age hoodie.upsert.shuffle.parallelism=100 #hive config hoodie.datasource.hive_sync.table=hudimor_deltastreamer_partition hoodie.datasource.hive_sync.partition_fields=age hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.use_jdbc=false hoodie.datasource.hive_sync.support_timestamp=true # Kafka Source topic hoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition #checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition # Kafka props # The kafka cluster we want to ingest from bootstrap.servers= xx.xx.xx.xx:xx auto.offset.reset=earliest #auto.offset.reset=latest group.id=hoodie-delta-streamer offset.rang.limit=10000
指定HoodieDeltaStreamer执行参数(具体参数配置,请查看官网https://hudi.apache.org/ )执行如下命令:
spark-submit --master yarn
--jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径
--driver-memory 1g
--executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m
--driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal
--props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。
--target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径
--table-type MERGE_ON_READ // 指定要写入的hudi表类型
--target-table hudimor_deltastreamer_partition // 指定hudi表名
--source-ordering-field name // 指定hudi表预合并列
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源 为JsonKafkaSource, 该参数根据不同数据源指定不同的source类
--schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema
--transformer-class com.huawei.bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制
--enable-hive-sync // 开启hive同步,同步hudi表到hive
--continuous // 指定流处理模式为连续模式