Updated on 2023-07-11 GMT+08:00

Single-Table Concurrency Control

By default, Hudi does not support concurrent write and compaction operations on a single table. When Flink or Spark is used to write data or Spark is used to perform compaction operations, Hudi attempts to obtain the lock corresponding to the table. (ZooKeeper in the cluster provides the distributed lock service and the configuration takes effect automatically.) If the lock fails to obtain, the task exits directly to prevent table damage caused by concurrent operations of the lock task. If the concurrent write function is enabled for a single Hudi table, these functions automatically become invalid.

Hudi Single-Table Concurrent Write Solution

  1. Uses an external service (ZooKeeper or Hive MetaStore) as the distributed mutex lock service.
  2. Files can be concurrently written, but commits cannot be concurrent. The commit operation is encapsulated in a transaction.
  3. When the commit operation is performed, the system performs conflict check. If the modified file list in the current commit operation overlaps with the file list in the commit operation after the instance time, the commit operation fails and the write operation is invalid.

Precautions for Using the Concurrency Mechanism

  1. The current Hudi concurrency mechanism cannot ensure that the primary key of the table is unique after data is written. You need to ensure that the primary key is unique.
  2. For incremental queries, data consumption and checkpoints may be out of order. As a result, multiple concurrent write operations are completed at different time points.
  3. Concurrent write is supported only after this feature is enabled.

How to Use the Concurrency Mechanism

  1. Enable the concurrent write mechanism.

    hoodie.write.concurrency.mode=optimistic_concurrency_control

    hoodie.cleaner.policy.failed.writes=LAZY

  2. Sets the concurrent lock mode.

    Hive MetaStore:

    hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider

    hoodie.write.lock.hivemetastore.database=<database_name>

    hoodie.write.lock.hivemetastore.table=<table_name>

    ZooKeeper:

    hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider

    hoodie.write.lock.zookeeper.url=<zookeeper_url>

    hoodie.write.lock.zookeeper.port=<zookeeper_port>

    hoodie.write.lock.zookeeper.lock_key=<table_name>

    hoodie.write.lock.zookeeper.base_path=<table_path>

For details about more parameters, see Configuration Reference.

If cleaner policy is set to Lazy, the system can only check whether the written files expire but cannot check and clear junk files generated by historical writes. That is, junk files cannot be automatically cleared in concurrent scenarios.