Spark On Hudi性能调优
优化Spark Shuffle参数提升Hudi写入效率
- 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。
- 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。
- 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。
配置项
集群默认值
调整后
--conf spark.shuffle.readHostLocalDisk
false
true
--conf spark.io.encryption.enabled
true
false
--conf spark.shuffle.service.enabled
false
true
调整Spark调度参数优化OBS场景下Spark调度时延
- 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率
配置项
集群默认值
调整后
--conf spark.locality.wait
3s
0s
--conf spark.locality.wait.process
3s
0s
--conf spark.locality.wait.node
3s
0s
--conf spark.locality.wait.rack
3s
0s
优化shuffle并行度,提升Spark加工效率
所谓的shuffle并发度如下图所示:
集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。
场景 |
配置项 |
集群默认值 |
调整后 |
---|---|---|---|
Jar作业 |
spark.default.parallelism |
200 |
按实际作业可用资源2倍设置 |
SQL作业 |
spark.sql.shuffle.partitions |
200 |
按实际作业可用资源2倍设置 |
hudi入库作业 |
hoodie.upsert.shuffle.parallelism |
200 |
非bucket表使用,按实际作业可用资源2倍设置 |
![](https://support.huaweicloud.com/intl/zh-cn/devg-rule-mrs/public_sys-resources/caution_3.0-zh-cn.png)
动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。
Bucket表,可以开启桶裁剪提升主键点查效率
示例:
业务经常使用主键id作为查询条件,执行点查;比如select xxx where id = idx ... 。
建表时,可以加入如下属性,提升查询效率。默认配置下属性值等于primaryKey,即主键。
hoodie.bucket.index.hash.field=id
初始化Hudi表时,可以使用BulkInsert方式快速写入数据
示例:
set hoodie.combine.before.insert=true; --入库前去重,如果数据没有重复 该参数无需设置。 set hoodie.datasource.write.operation = bulk_insert; --指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; --指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数。 insert into dsrTable select * from srcTabble
开启log列裁剪,提升mor表查询效率
mor表读取的时候涉及到Log和Parquet的合并,性能不是很理想。可以开启log列裁剪减少合并时IO读取开销
SparkSQL执行查询,先执行:
set hoodie.enable.log.column.prune=true;
Spark加工Hudi表时其他参数优化
- 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。
- 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。
配置项
集群默认值
调整后
--conf spark.sql.enableToString
true
false
--conf spark.speculation
false
false