Spark SQL join优化
操作场景
Spark SQL中,当对两个表进行join操作时,利用Broadcast特性(请参见使用广播变量),将小表BroadCast到各个节点上,从而转变成非shuffle操作,提高任务执行性能。
这里join操作,只指inner join。
操作步骤
在Spark SQL中进行Join操作时,可以按照以下步骤进行优化。为了方便说明,设表A和表B,且A、B表都有个名为name的列。对A、B表进行join操作。
- 估计表的大小。
也可以在Hive的数据库存储路径下直接查看表的大小。首先在Spark的配置文件“hive-site.xml”中,查看Hive的数据库路径的配置,默认为“/user/hive/warehouse”。
<property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property>
然后通过hadoop命令查看对应表的大小。如查看表A的大小命令为:
hadoop fs -du -s -h ${test.warehouse.dir}/a
进行广播操作,对表有要求:
- 至少有一个表不是空表;
- 表不能是“external table”;
- 表的储存方式需为textfile(默认是textfile文件格式),如
create table A( name string ) stored as textfile; 或: create table A( name string );
- 配置自动广播的阈值。
Spark中,判断表是否广播的阈值为10485760(即10M)。如果两个表的大小至少有一个小于10M时,可以跳过该步骤。
自动广播阈值的配置参数介绍,见表1。
表1 参数介绍 参数
默认值
描述
spark.sql.autoBroadcastJoinThreshold
10485760
当进行join操作时,配置广播的最大值;当表的字节数小于该值时便进行广播。当配置为-1时,将不进行广播。
参见https://spark.apache.org/docs/latest/sql-programming-guide.html
配置自动广播阈值的方法:
- 在Spark的配置文件“spark-defaults.conf”中,设置“spark.sql.autoBroadcastJoinThreshold”的值。其中,<size>根据场景而定,但要求该值至少比其中一个表大。
spark.sql.autoBroadcastJoinThreshold = <size>
- 利用Hive CLI命令,设置阈值。在运行Join操作时,提前运行下面语句
SET spark.sql.autoBroadcastJoinThreshold=<size>
其中,<size>根据场景而定,但要求该值至少比其中一个表大。
- 在Spark的配置文件“spark-defaults.conf”中,设置“spark.sql.autoBroadcastJoinThreshold”的值。其中,<size>根据场景而定,但要求该值至少比其中一个表大。
- 进行join操作。
如果A表和B表都小于阈值,且A表的字节数小于B表时,则运行B join A,如
SELECT A.name FROM B JOIN A ON A.name = B.name;
否则运行A join B。
SELECT A.name FROM A JOIN B ON A.name = B.name;