更新时间:2024-05-16 GMT+08:00
分享

Spark On Hudi性能调优

优化Spark Shuffle参数提升Hudi写入效率

  • 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。
  • 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。
  • 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。

    配置项

    集群默认值

    调整后

    --conf spark.shuffle.readHostLocalDisk

    false

    true

    --conf spark.io.encryption.enabled

    true

    false

    --conf spark.shuffle.service.enabled

    false

    true

调整Spark调度参数优化OBS场景下Spark调度时延

  • 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率

    配置项

    集群默认值

    调整后

    --conf spark.locality.wait

    3s

    0s

    --conf spark.locality.wait.process

    3s

    0s

    --conf spark.locality.wait.node

    3s

    0s

    --conf spark.locality.wait.rack

    3s

    0s

优化shuffle并行度,提升Spark加工效率

所谓的shuffle并发度如下图所示:

集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。

场景

配置项

集群默认值

调整后

Jar作业

spark.default.parallelism

200

按实际作业可用资源2倍设置

SQL作业

spark.sql.shuffle.partitions

200

按实际作业可用资源2倍设置

hudi入库作业

hoodie.upsert.shuffle.parallelism

200

非bucket表使用,按实际作业可用资源2倍设置

动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估

多表join开启自适应参数,自动处理join过程数据倾斜

问题现象

  1. 少数Task处理的数据量远远超过其余Task。

  2. 该Stage的DAG图是一个join阶段。

    处理方法可以考虑如下操作:

    配置项

    集群默认值

    调整后

    spark.sql.adaptive.enabled

    false

    true

    spark.sql.adaptive.advisoryPartitionSizeInBytes

    64MB

    配置为对应stage总数据量大小 / stage并行度

    spark.sql.optimizer.dynamicPartitionPruning.enabled

    (仅在Spark 2.4.5下需要设置该值,原因是因为此版本中两个特性存在冲突)

    true

    false

Bucket表,可以开启桶裁剪提升主键点查效率

示例:

业务经常使用主键id作为查询条件,执行点查;比如select xxx where id = idx ... 。

建表时,可以加入如下属性,提升查询效率。默认配置下属性值等于primaryKey,即主键。

hoodie.bucket.index.hash.field=id

初始化Hudi表时,可以使用BulkInsert方式快速写入数据

示例:

set hoodie.combine.before.insert=true;                // 入库前去重,如果数据没有重复 该参数无需设置
set hoodie.datasource.write.operation = bulk_insert;  // 指定写入方式为bulk insert方式。
set hoodie.bulkinsert.shuffle.parallelism = 4;        // 指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数
insert into dsrTable select * from srcTabble

开启log列裁剪,提升mor表查询效率

mor表读取的时候涉及到Log和Parquet的合并,性能不是很理想。可以开启log列裁剪减少合并时IO读取开销

SparkSQL执行查询,先执行:

set hoodie.enable.log.column.prune=true;

Spark加工Hudi表时其他参数优化

  • 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。
  • 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。

    配置项

    集群默认值

    调整后

    --conf spark.sql.enableToString

    true

    false

    --conf spark.speculation

    false

    false

分享:

    相关文档

    相关产品