Updated on 2024-09-10 GMT+08:00

Suggestions for Spark Concurrently Write Hudi Data

  • In concurrent scenarios, the inter-partition concurrent write mode is recommended. That is, different write tasks are written to different partitions.

    Partition concurrency parameter control:

    • SQL mode:
      set hoodie.support.partition.lock=true;
    • DataSource API mode:
      df.write
      .format("hudi")
      .options(xxx)
      .option("hoodie.support.partition.lock", "true")
      .mode(xxx)
      .save("/tmp/tablePath")

The preceding parameters must be set for all inter-partition concurrent write tasks.

  • Do not perform concurrent writes in the same partition. To perform concurrent writes, you need to enable the Hudi OCC mode and strictly comply with the concurrency parameter settings. Otherwise, table data may be damaged.

    Concurrent OCC parameter control:

    • SQL mode:
      //Enable the OCC.
      set hoodie.write.concurrency.mode=optimistic_concurrency_control;
      set hoodie.cleaner.policy.failed.writes=LAZY;
      //Enable the concurrent lock ZooKeeper.
      set hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; // Setting the ZooKeeper Lock
      set hoodie.write.lock.zookeeper.url=<zookeeper_url>; // Setting the ZooKeeper Address
      set hoodie.write.lock.zookeeper.port=<zookeeper_port>; // Setting the Use of the ZooKeeper Port
      set hoodie.write.lock.zookeeper.lock_key=<table_name>; // Set lock name
      set hoodie.write.lock.zookeeper.base_path=<table_path>; // Setting the ZooKeeper Lock Path
    • DataSource API mode:
      df.write
      .format("hudi")
      .options(xxx)
      .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
      .option("hoodie.cleaner.policy.failed.writes", "LAZY")
      .option("hoodie.write.lock.zookeeper.url", "zookeeper_url")
      .option("hoodie.write.lock.zookeeper.port", "zookeeper_port")
      .option("hoodie.write.lock.zookeeper.lock_key", "table_name")
      .option("hoodie.write.lock.zookeeper.base_path", "table_path")
      .mode(xxx)
      .save("/tmp/tablePath")
  1. The preceding parameters must be set for all concurrent write tasks. The OCC does not ensure that all concurrent write tasks are successfully executed. When multiple write tasks update the same file, only one task succeeds.
  2. In the concurrent scenario, the cleaner policy must be set to Lazy. Therefore, junk files cannot be automatically deleted.