文档首页/
MapReduce服务 MRS/
组件操作指南(LTS版)/
使用Hudi/
Hudi SQL语法参考/
隐式分区Hidden Partition/
Spark与Flink混写隐式分区的表
更新时间:2025-12-08 GMT+08:00
Spark与Flink混写隐式分区的表
- 混写场景Spark侧与Flink侧均必须使用bucket索引。
- 混写场景Spark侧与Flink侧设置的hoodie.hidden.partitioning.rule必须保持一致。
示例
- Spark建表:
CREATE TABLE default.flink_stream_mor3 ( uuid STRING, name STRING, age INT, ts TIMESTAMP, ts6 TIMESTAMP, p STRING) USING hudi LOCATION 'hdfs://hacluster/tmp/hudi/flink_stream_mor3' TBLPROPERTIES ( 'hoodie.hidden.partitioning.rule' = 'year(ts), date(ts, MM), bucket(age, 3)', 'hoodie.hidden.partitioning.enabled' = 'true', 'hoodie.index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '2', 'preCombineField' = 'uuid', 'primaryKey' = 'uuid', 'type' = 'mor');
- Spark写入存量数据:
insert into flink_stream_mor3 select uuid,name,age,ts,ts6,p from tmp_table
- Flink流式写入数据:
create table kafka_sink( uuid varchar(20), name varchar(10), age int, ts timestamp(3), ts6 timestamp(6), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'XXX', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.HADOOP.COM', 'format' = 'json' ); create hudi table hudi_source( uuid varchar(20), name varchar(10), age int, ts timestamp(3), ts6 timestamp(6), p varchar(20) ) with ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/flink_stream_mor3', 'table.type' = 'MERGE_ON_READ', 'hoodie.table.name' = 'flink_stream_mor3', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'hoodie.datasource.query.type' = 'snapshot', 'write.precombine.field' = 'uuid', 'write.tasks' = '4', 'write.index_bootstrap.tasks' = '4', 'write.bucket_assign.tasks' = '4', 'hoodie.datasource.write.hive_style_partitioning' = 'true', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '5', 'compaction.delta_commits' = '3', 'compaction.async.enabled' = 'false', 'compaction.schedule.enabled' = 'true', 'hoodie.hidden.partitioning.enabled' = 'true', 'hoodie.hidden.partitioning.rule' = 'year(ts), date(ts, MM), bucket(age, 3)', 'index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '2', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'jdbc', 'hive_sync.jdbc_url' = 'XXXX', 'hive_sync.table' = 'flink_stream_mor3', 'hive_sync.metastore.uris' = 'XXXX', 'hive_sync.support_timestamp' = 'true', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'clean.async.enabled' = 'false' ); insert into hudi_source select * from kafka_sink;
- 停止Flink流写,使用Spark-sql补数:
insert into flink_stream_mor3 select uuid,name,age,ts,ts6,p from flink_stream_mor3 where uuid =1;
父主题: 隐式分区Hidden Partition