流式写入Hudi表
HoodieDeltaStreamer流式写入
Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能:
- 支持Kafka,DFS多种数据源接入 。
- 支持管理检查点、回滚和恢复,保证exactly once语义。
- 支持自定义转换操作。
示例:
准备配置文件kafka-source.properties
#hudi配置 hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=age hoodie.upsert.shuffle.parallelism=100 #hive config hoodie.datasource.hive_sync.table=hudimor_deltastreamer_partition hoodie.datasource.hive_sync.partition_fields=age hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.use_jdbc=false hoodie.datasource.hive_sync.support_timestamp=true # Kafka Source topic hoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition #checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition # Kafka props # The kafka cluster we want to ingest from bootstrap.servers= xx.xx.xx.xx:xx auto.offset.reset=earliest #auto.offset.reset=latest group.id=hoodie-delta-streamer offset.rang.limit=10000
指定HoodieDeltaStreamer执行参数(具体参数配置,请查看官网https://hudi.apache.org/ )执行如下命令:
spark-submit --master yarn
--jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径
--driver-memory 1g
--executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m
--driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal
--props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。
--target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径
--table-type MERGE_ON_READ // 指定要写入的hudi表类型
--target-table hudimor_deltastreamer_partition // 指定hudi表名
--source-ordering-field name // 指定hudi表预合并列
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源为JsonKafkaSource, 该参数根据不同数据源指定不同的source类
--schemaprovider-class com.huaweixxx.bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema
--transformer-class com.huaweixxx.bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制
--enable-hive-sync // 开启hive同步,同步hudi表到hive
--continuous // 指定流处理模式为连续模式
HoodieMultiTableDeltaStreamer流式写入
HoodieMultiTableDeltaStreamer流式写入仅适用于MRS 3.2.0及之后版本。
HoodieDeltaStreamer支持从多种类型的源表抓取数据写入Hudi目标表,但是HoodieDeltaStreamer只能完成一个源表更新一个目标表。而HoodieMultiTableDeltaStreamer可以完成多个源表更新多个目标表,也可以完成多个源表更新一个目标表。
- 多个源表写一个目标表(两个kafka source写一个Hudi表):
主要配置:
// 指定目标表 hoodie.deltastreamer.ingestion.tablesToBeIngested=目录名.目标表 // 指定所有的源表给特定目标表 hoodie.deltastreamer.source.sourcesBoundTo.目标表=目录名.源表1,目录名.源表2 // 指定每个源表的配置文件路径 hoodie.deltastreamer.source.目录名.源表1.configFile=路径1 hoodie.deltastreamer.source.目录名.源表2.configFile=路径2 // 指定每个源表的恢复点,source类型不同,恢复点的格式也不同。如kafka soruce格式为"topic名,分区名:offset" hoodie.deltastreamer.current.source.checkpoint=topic名,分区名:offset // 指定每个源表的关联表(hudi表),如果有多个用逗号隔开 hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/.....,hdfs://hacluster/..... // 指定每个源表的数据在写入hudi前的transform操作,注意需要明确列出需要写入的列,不要使用select * // <SRC>代表当前source表,不要替换,这是固定写法 hoodie.deltastreamer.transformer.sql=select field1,field2,field3,... from <SRC>
Spark提交命令:
spark-submit \ --master yarn \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ --num-executors 5 \ --conf spark.driver.extraClassPath=/opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/* \ --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer /opt/client/Hudi/hudi/lib/hudi-utilities_2.12-*.jar \ --props file:///opt/hudi/testconf/sourceCommon.properties \ --config-folder file:///opt/hudi/testconf/ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --schemaprovider-class org.apache.hudi.examples.common.HoodieMultiTableDeltaStreamerSchemaProvider \ --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ --source-ordering-field col6 \ --base-path-prefix hdfs://hacluster/tmp/ \ --table-type COPY_ON_WRITE \ --target-table KafkaToHudi \ --enable-hive-sync \ --allow-fetch-from-multiple-sources \ --allow-continuous-when-multiple-sources
- 当“source”的类型是“kafka source”时,“--schemaprovider-class”指定的schema provider类需要用户自己开发。
- “--allow-fetch-from-multiple-sources”表示开启多源表写入。
- “--allow-continuous-when-multiple-sources”表示开启多源表持续写入,如果未设置所有源表写入一次后任务就会结束。
sourceCommon.properties :
hoodie.deltastreamer.ingestion.tablesToBeIngested=testdb.KafkaToHudi hoodie.deltastreamer.source.sourcesBoundTo.KafkaToHudi=source1,source2 hoodie.deltastreamer.source.default.source1.configFile=file:///opt/hudi/testconf/source1.properties hoodie.deltastreamer.source.default.source2.configFile=file:///opt/hudi/testconf/source2.properties hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator hoodie.datasource.write.partitionpath.field=col0 hoodie.datasource.write.recordkey.field=primary_key hoodie.datasource.write.precombine.field=col6 hoodie.datasource.hive_sync.table=kafkatohudisync hoodie.datasource.hive_sync.partition_fields=col0 hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor bootstrap.servers=192.168.34.221:21005,192.168.34.136:21005,192.168.34.175:21005 auto.offset.reset=latest group.id=hoodie-test
source1.properties:
hoodie.deltastreamer.current.source.name=source1 // kafka topic的名称有时候可读性很差,所以这里给它取个别名当作source的名称 hoodie.deltastreamer.source.kafka.topic=s1 hoodie.deltastreamer.current.source.checkpoint=s1,0:0,1:0 // 任务启动时,该source的恢复点(从0分区的0 offset,1分区的0 offset开始恢复) // 指定与source1表进行join的hudi表,如果该hudi表已经同步到hive,则不需要该配置,直接在sql中通过表名来使用 hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/tmp/huditest/tb_test_cow_par // <SRC>代表当前的source表,即source1,固定写法 hoodie.deltastreamer.transformer.sql=select A.primary_key, A.col0, B.col1, B.col2, A.col3, A.col4, B.col5, B.col6, B.col7 from <SRC> as A join tb_test_cow_par as B on A.primary_key = B.primary_key
source2.properties
hoodie.deltastreamer.current.source.name=source2 hoodie.deltastreamer.source.kafka.topic=s2 hoodie.deltastreamer.current.source.checkpoint=s2,0:0,1:0 hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/tmp/huditest/tb_test_cow_par hoodie.deltastreamer.transformer.sql=select A.primary_key, A.col0, B.col1, B.col2, A.col3, A.col4, B.col5, B.col6, B.col7 from <SRC> as A join tb_test_cow_par as B on A.primary_key = B.primary_key
- 多个源表写一个目标表(两个Hudi表source写一个Hudi表):
Spark提交命令:
spark-submit \ --master yarn \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ --num-executors 2 \ --conf spark.driver.extraClassPath=/opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/* \ --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer /opt/client/Hudi/hudi/lib/hudi-utilities_2.12-*.jar \ --props file:///opt/testconf/sourceCommon.properties \ --config-folder file:///opt/testconf/ \ --source-class org.apache.hudi.utilities.sources.HoodieIncrSource \ //指定source的类型是Hudi表,作为源表的Hudi表只能是COW类型 --payload-class org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload \ //指定一个payload, payload决定了新值更新旧值的方式。 --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ //指定一个transformer类,源表schema和目标表的schema不一致时,源表的数据需要进行transform才能写入目标表。 --source-ordering-field col6 \ --base-path-prefix hdfs://hacluster/tmp/ \ //目标表的存放路径 --table-type MERGE_ON_READ \ //目标表的类型,可以是COW表也可以是MOR表。 --target-table tb_test_mor_par_300 \ //指定目标表的表名,多源表更新单表时,目标表的表名必须给出。 --checkpoint 000 \ //指定一个检查点(commit时间戳),表明从此检查点恢复Delta Streamer,000代表从头开始。 --enable-hive-sync \ --allow-fetch-from-multiple-sources \ --allow-continuous-when-multiple-sources \ --op UPSERT //指定写操作类型
- 当“source”的类型是“HoodieIncrSourc”时,不需要指定“--schemaprovider-class”。
- “--transformer-class”指定SqlQueryBasedTransformer,可以通过SQL来操作数据转换,将源数据结构转换成目标表数据结构。
file:///opt/testconf/sourceCommon.properties:
# source的公共属性 hoodie.deltastreamer.ingestion.tablesToBeIngested=testdb.tb_test_mor_par_300 //指定一个目标表。多源表写单目标表,所以目标表可以作为公共属性。 hoodie.deltastreamer.source.sourcesBoundTo.tb_test_mor_par_300=testdb.tb_test_mor_par_100,testdb.tb_test_mor_par_200 //指定多个源表。 hoodie.deltastreamer.source.testdb.tb_test_mor_par_100.configFile=file:///opt/testconf/tb_test_mor_par_100.properties //源表tb_test_mor_par_100的属性文件路径 hoodie.deltastreamer.source.testdb.tb_test_mor_par_200.configFile=file:///opt/testconf/tb_test_mor_par_200.properties //源表tb_test_mor_par_200的属性文件路径 # 所有source公用的hudi写配置,source独立的配置需要写到自己对应的属性文件中 hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator hoodie.datasource.write.partitionpath.field=col0 hoodie.datasource.write.recordkey.field=primary_key hoodie.datasource.write.precombine.field=col6
file:///opt/testconf/tb_test_mor_par_100.properties
# 源表tb_test_mor_par_100的配置 hoodie.deltastreamer.source.hoodieincr.path=hdfs://hacluster/tmp/testdb/tb_test_mor_par_100 //源表的路径 hoodie.deltastreamer.source.hoodieincr.partition.fields=col0 //源表的分区键 hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=false hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/tmp/testdb/tb_test_mor_par_400 //指定与源表进行关联操作的表 hoodie.deltastreamer.transformer.sql=select A.primary_key, A.col0, B.col1, B.col2, A.col3, A.col4, B.col5, A.col6, B.col7 from <SRC> as A join tb_test_mor_par_400 as B on A.primary_key = B.primary_key //该配置在transformer类指定为SqlQueryBasedTransformer才会生效
file:///opt/testconf/tb_test_mor_par_200.properties# 源表tb_test_mor_par_200的配置 hoodie.deltastreamer.source.hoodieincr.path=hdfs://hacluster/tmp/testdb/tb_test_mor_par_200 hoodie.deltastreamer.source.hoodieincr.partition.fields=col0 hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=false hoodie.deltastreamer.source.associated.tables=hdfs://hacluster/tmp/testdb/tb_test_mor_par_400 hoodie.deltastreamer.transformer.sql=select A.primary_key, A.col0, B.col1, B.col2, A.col3, A.col4, B.col5, A.col6, B.col7 from <SRC> as A join tb_test_mor_par_400 as B on A.primary_key = B.primary_key //源表数据结构转换为目标表的数据结构。该源表如果需要和Hive进行关联操作,可以直接在SQL中通过表名来进行关联操作;该源表如果需要和Hudi表关联操作,需要先指定Hudi表的路径,然后在SQL中通过表名来进行关联操作。