更新时间:2024-12-11 GMT+08:00

Spark并发写Hudi建议

  • 涉及到并发场景,推荐采用分区间并发写的方式:即不同的写入任务写不同的分区

    分区并发参数控制:

    • SQL方式:
      set hoodie.support.partition.lock=true;
    • DataSource Api方式:
      df.write
      .format("hudi")
      .options(xxx)
      .option("hoodie.support.partition.lock", "true")
      .mode(xxx)
      .save("/tmp/tablePath")

    所有参与分区间并发写入的任务,都必须配置上述参数。

  • 不建议同分区内并发写,这种并发写入需要开启Hudi OCC方式并发写入,必须严格遵守并发参数配置,否则会出现表数据损坏的问题。

    并发OCC参数控制:

    • SQL方式:
      --开启OCC。
      set hoodie.write.concurrency.mode=optimistic_concurrency_control;
      set hoodie.cleaner.policy.failed.writes=LAZY;
      
      --开启并发ZooKeeper锁。
      set hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; --设置使用ZooKeeper锁。
      set hoodie.write.lock.zookeeper.url=<zookeeper_url>;  --设置使用ZooKeeper地址。
      set hoodie.write.lock.zookeeper.port=<zookeeper_port>; --设置使用ZooKeeper端口。
      set hoodie.write.lock.zookeeper.lock_key=<table_name>;  --设置锁名称。
      set hoodie.write.lock.zookeeper.base_path=<table_path>; --设置zk锁路径。
    • DataSource Api方式:
      df.write
      .format("hudi")
      .options(xxx)
      .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
      .option("hoodie.cleaner.policy.failed.writes", "LAZY")
      .option("hoodie.write.lock.zookeeper.url", "zookeeper_url")
      .option("hoodie.write.lock.zookeeper.port", "zookeeper_port")
      .option("hoodie.write.lock.zookeeper.lock_key", "table_name")
      .option("hoodie.write.lock.zookeeper.base_path", "table_path")
      .mode(xxx)
      .save("/tmp/tablePath")
      
    1. 所有参与并发写入的任务,都必须配置上述参数。OCC不会保证所有参与并发写入的任务都执行成功;当出现多个写任务更新同一个文件时,只有一个任务可以成功,其余失败。
    2. 并发场景下,需要设置cleaner policy为Lazy,因此无法自动清理垃圾文件。