更新时间:2025-12-08 GMT+08:00
分享

Spark与Flink混写隐式分区的表

  1. 混写场景Spark侧与Flink侧均必须使用bucket索引。
  2. 混写场景Spark侧与Flink侧设置的hoodie.hidden.partitioning.rule必须保持一致。

示例

  • Spark建表:
    CREATE TABLE default.flink_stream_mor3 (
      uuid STRING,
      name STRING,
      age INT,
      ts TIMESTAMP,
      ts6 TIMESTAMP,
      p STRING)
    USING hudi
    LOCATION 'hdfs://hacluster/tmp/hudi/flink_stream_mor3'
    TBLPROPERTIES (
      'hoodie.hidden.partitioning.rule' = 'year(ts), date(ts, MM), bucket(age, 3)',
      'hoodie.hidden.partitioning.enabled' = 'true',
      'hoodie.index.type' = 'BUCKET',
      'hoodie.bucket.index.num.buckets' = '2',
      'preCombineField' = 'uuid',
      'primaryKey' = 'uuid',
      'type' = 'mor');
    
  • Spark写入存量数据:
    insert into flink_stream_mor3 select uuid,name,age,ts,ts6,p from tmp_table
  • Flink流式写入数据:
    create table kafka_sink(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp(3),
      ts6 timestamp(6),
      p varchar(20)
    ) with (
      'connector' = 'kafka',
      'topic' = 'input2',
      'properties.bootstrap.servers' = 'XXX',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.HADOOP.COM',
      'format' = 'json'
    );
    create hudi table hudi_source(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp(3),
      ts6 timestamp(6),
      p varchar(20)
    ) with (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/flink_stream_mor3',
      'table.type' = 'MERGE_ON_READ',
      'hoodie.table.name' = 'flink_stream_mor3',
      'hoodie.datasource.write.recordkey.field' = 'uuid',
      'hoodie.datasource.query.type' = 'snapshot',
      'write.precombine.field' = 'uuid',
      'write.tasks' = '4',
      'write.index_bootstrap.tasks' = '4',
      'write.bucket_assign.tasks' = '4',
      'hoodie.datasource.write.hive_style_partitioning' = 'true',
      'read.streaming.enabled' = 'true',
      'read.streaming.check-interval' = '5',
      'compaction.delta_commits' = '3',
      'compaction.async.enabled' = 'false',
      'compaction.schedule.enabled' = 'true',
      'hoodie.hidden.partitioning.enabled' = 'true',
      'hoodie.hidden.partitioning.rule' = 'year(ts), date(ts, MM), bucket(age, 3)',
      'index.type' = 'BUCKET',
      'hoodie.bucket.index.num.buckets' = '2',
      'hive_sync.enable' = 'true',
      'hive_sync.mode' = 'jdbc',
      'hive_sync.jdbc_url' = 'XXXX',
      'hive_sync.table' = 'flink_stream_mor3',
      'hive_sync.metastore.uris' = 'XXXX',
      'hive_sync.support_timestamp' = 'true',
      'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
      'clean.async.enabled' = 'false'
    );
    insert into
      hudi_source
    select
      *
    from
      kafka_sink;
  • 停止Flink流写,使用Spark-sql补数:
    insert into flink_stream_mor3 select uuid,name,age,ts,ts6,p from flink_stream_mor3 where uuid =1;

相关文档