Spark SQL join优化
操作场景
Spark SQL中,当对两个表进行join操作时,利用Broadcast特性(见“使用广播变量”章节),将被广播的表BroadCast到各个节点上,从而转变成非shuffle操作,提高任务执行性能。
这里join操作,只指inner join。
操作步骤
在Spark SQL中进行Join操作时,可以按照以下步骤进行优化。为了方便说明,设表A和表B,且A、B表都有个名为name的列。对A、B表进行join操作。
- 估计表的大小。
也可以在Hive的数据库存储路径下直接查看表的大小。首先在Spark的配置文件“hive-site.xml”中,查看Hive的数据库路径的配置,默认为“/user/hive/warehouse”。
<property> <name>hive.metastore.warehouse.dir</name> <value>${test.warehouse.dir}</value> <description></description> </property>
然后通过hadoop命令查看对应表的大小。如查看表A的大小命令为:
hadoop fs -du -s -h ${test.warehouse.dir}/a
进行广播操作,需要至少有一个表不是空表。
- 配置自动广播的阈值。
Spark中,判断表是否广播的阈值为10485760(即10M)。如果两个表的大小至少有一个小于10M时,可以跳过该步骤。
自动广播阈值的配置参数介绍,见表1。
表1 参数介绍 参数
默认值
描述
spark.sql.autoBroadcastJoinThreshold
10485760
当进行join操作时,配置广播的最大值。
- 当SQL语句中涉及的表中相应字段的大小小于该值时,进行广播。
- 配置为-1时,将不进行广播。
参见https://spark.apache.org/docs/3.1.1/sql-programming-guide.html
配置自动广播阈值的方法:
- 在Spark的配置文件“spark-defaults.conf”中,设置“spark.sql.autoBroadcastJoinThreshold”的值。
spark.sql.autoBroadcastJoinThreshold = <size>
- 利用Hive CLI命令,设置阈值。在运行Join操作时,提前运行下面语句:
SET spark.sql.autoBroadcastJoinThreshold=<size>
- 进行join操作。
- (可选)如下两种场景,需要执行Analyze命令(ANALYZE TABLE tableName COMPUTE STATISTICS noscan;)更新表元数据后进行广播。
- 需要广播的表是分区表,新建表且文件类型为非Parquet文件类型。
- 需要广播的表是分区表,更新表数据后。
参考信息
被广播的表执行超时,导致任务结束。
默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。
这种情况下,有两种方式处理:
- 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。
- 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。