Updated on 2024-09-10 GMT+08:00

Spark On Hudi Performance Optimization

Optimizing Spark Shuffle Parameters to Improve Hudi Write Efficiency

  • If spark.shuffle.readHostLocalDisk is set to true, the local disk reads shuffle data, reducing network transmission overhead.
  • If spark.io.encryption.enabled is set to false, the shuffle process is disabled from writing encrypted disks, improving the shuffle efficiency.
  • Set spark.shuffle.service.enabled to true to enable the shuffle service and improve the stability of the shuffle task.

Configuration Item

Cluster Default Value

After adjustment

--conf spark.shuffle.readHostLocalDisk

false

true

--conf spark.io.encryption.enabled

true

false

--conf spark.shuffle.service.enabled

false

true

Adjusting Spark Scheduling Parameters to Optimize the Spark Scheduling Delay in OBS Scenarios

  • If OBS storage is enabled, Spark locality can be disabled to improve Spark scheduling efficiency.

Configuration Item

Cluster Default Value

After adjustment

--conf spark.locality.wait

3s

0s

--conf spark.locality.wait.process

3s

0s

--conf spark.locality.wait.node

3s

0s

--conf spark.locality.wait.rack

3s

0s

Optimizes the shuffle parallelism and improves the Spark processing efficiency.

The following figure shows the shuffle concurrency.

The default cluster size is 200. You can set the job size separately. If the bottleneck stage (long execution time) is found and the number of cores allocated to the current job is greater than the number of concurrent jobs, the concurrency is insufficient. Optimize the configuration as follows:

Scenario

Configuration Item

Cluster Default

After adjustment

Jar Job

spark.default.parallelism

200

Set this parameter to twice the number of available resources in the actual job.

SQL Job

spark.sql.shuffle.partitions

200

Set this parameter to twice the number of available resources in the actual job.

Hudi Warehousing Operation

hoodie.upsert.shuffle.parallelism

200

Used by non-bucket tables. Set this parameter to twice the number of available resources.

When spark.dynamicAllocation.enabled is set to true, resources are evaluated based on spark.dynamicAllocation.maxExecutors.

Bucket table. Bucket tailoring can be enabled to improve primary key click query efficiency.

Example:

The service uses the primary key ID as the query condition to perform the point query. For example, select xxx where id = idx...

When creating a table, you can add the following attributes to improve the query efficiency. By default, the attribute value is primaryKey, that is, primary key.

hoodie.bucket.index.hash.field=id

When initializing a Hudi table, you can quickly write data in BulkInsert mode.

Example:

set hoodie.combine.before.insert=true; //: deduplicates the data before the database is imported. If the data is not duplicate, you do not need to set this parameter.
set hoodie.datasource.write.operation = bulk_insert; // specifies the bulk insert mode.
set hoodie.bulkinsert.shuffle.parallelism = 4; // specifies the degree of parallelism during bulk_insert data writing, which is equal to the number of partition parquet files saved after data writing.
insert into dsrTable select * from srcTabble

Enable log column tailoring to improve the query efficiency of the mor table.

When the mor table is read, logs and Parquet are combined, and the performance is not satisfactory. Log column tailoring can be enabled to reduce the I/O read overhead during combination.

Run the following command to query the SparkSQL database:

set hoodie.enable.log.column.prune=true;

Other parameters are optimized when Spark processes Hudi tables.

  • Set spark.sql.enableToString to false to reduce the memory usage when Spark parses complex SQL statements and improve the parsing efficiency.
  • If spark.speculation is set to false, speculative execution is disabled. Enabling this parameter consumes extra CPU resources. In addition, Hudi does not support this parameter. If this parameter is enabled, files may be damaged.

Configuration Item

Cluster Default

After adjustment

--conf spark.sql.enableToString

true

false

--conf spark.speculation

false

false