Updated on 2025-04-15 GMT+08:00

Suggestions for Concurrency Write to Hudi

  • In concurrent scenarios, inter-partition concurrent write is recommended. That is, different write tasks write data to different partitions.

    Partition concurrency parameters:

    • SQL
      set hoodie.support.partition.lock=true;
    • DataSource API
      df.write
      .format("hudi")
      .options(xxx)
      .option("hoodie.support.partition.lock", "true")
      .mode(xxx)
      .save("/tmp/tablePath")

    All tasks involved in inter-partition concurrent write must be configured with the above parameters.

  • Concurrent writes within the same partition are not advised. To perform concurrent writes, enable the Hudi OCC mode and strictly follow the concurrency parameter settings; otherwise, data corruption may arise.

    Concurrent OCC parameters

    • SQL
      --Enable the OCC.
      set hoodie.write.concurrency.mode=optimistic_concurrency_control;
      set hoodie.cleaner.policy.failed.writes=LAZY;
      
      --Enable the concurrent ZooKeeper lock.
      set hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; --Use the ZooKeeper lock.
      set hoodie.write.lock.zookeeper.url= <zookeeper_url>;  --Set the ZooKeeper address.
      set hoodie.write.lock.zookeeper.port=<zookeeper_port>; --Set the ZooKeeper port.
      set hoodie.write.lock.zookeeper.lock_key=<table_name>;  --Set the lock name.
      set hoodie.write.lock.zookeeper.base_path=<table_path>; --Set ZooKeeper lock path.
    • DataSource API
      df.write
      .format("hudi")
      .options(xxx)
      .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
      .option("hoodie.cleaner.policy.failed.writes", "LAZY")
      .option("hoodie.write.lock.zookeeper.url", "zookeeper_url")
      .option("hoodie.write.lock.zookeeper.port", "zookeeper_port")
      .option("hoodie.write.lock.zookeeper.lock_key", "table_name")
      .option("hoodie.write.lock.zookeeper.base_path", "table_path")
      .mode(xxx)
      .save("/tmp/tablePath")
      
    1. All tasks participating concurrent write must be configured with the above parameters. OCC does not guarantee the successful execution of all concurrent write tasks. When multiple write tasks update the same file, only one task can succeed while the rest fail.
    2. In the concurrency scenario, you need to set the cleaner policy to Lazy, so junk files cannot be automatically cleaned.