更新时间:2022-07-19 GMT+08:00

Spark SQL join优化

操作场景

Spark SQL中,当对两个表进行join操作时,利用Broadcast特性(请参见使用广播变量),将小表BroadCast到各个节点上,从而转变成非shuffle操作,提高任务执行性能。

这里join操作,只指inner join。

操作步骤

在Spark SQL中进行Join操作时,可以按照以下步骤进行优化。为了方便说明,设表A和表B,且A、B表都有个名为name的列。对A、B表进行join操作。

  1. 估计表的大小。

    根据每次加载数据的大小,来估计表大小。

    也可以在Hive的数据库存储路径下直接查看表的大小。首先在Spark的配置文件“hive-site.xml”中,查看Hive的数据库路径的配置,默认为“/user/hive/warehouse”

    <property>
       <name>hive.metastore.warehouse.dir</name>
      <value>/user/hive/warehouse</value>  
    </property>

    然后通过hadoop命令查看对应表的大小。如查看表A的大小命令为:

    hadoop fs -du -s -h ${test.warehouse.dir}/a

    进行广播操作,对表有要求:

    1. 至少有一个表不是空表;
    2. 表不能是“external table”;
    3. 表的储存方式需为textfile(默认是textfile文件格式),如
      create table A( name string ) stored as textfile;
      或:
      create table A( name string );
  2. 配置自动广播的阈值。

    Spark中,判断表是否广播的阈值为10485760(即10M)。如果两个表的大小至少有一个小于10M时,可以跳过该步骤。

    自动广播阈值的配置参数介绍,见表1

    表1 参数介绍

    参数

    默认值

    描述

    spark.sql.autoBroadcastJoinThreshold

    10485760

    当进行join操作时,配置广播的最大值;当表的字节数小于该值时便进行广播。当配置为-1时,将不进行广播。

    参见https://spark.apache.org/docs/latest/sql-programming-guide.html

    配置自动广播阈值的方法:

    • 在Spark的配置文件“spark-defaults.conf”中,设置“spark.sql.autoBroadcastJoinThreshold”的值。其中,<size>根据场景而定,但要求该值至少比其中一个表大。
      spark.sql.autoBroadcastJoinThreshold = <size>
    • 利用Hive CLI命令,设置阈值。在运行Join操作时,提前运行下面语句
      SET spark.sql.autoBroadcastJoinThreshold=<size>

      其中,<size>根据场景而定,但要求该值至少比其中一个表大。

  3. 进行join操作。

    这时join的两个table,至少有个表是小于阈值的。

    如果A表和B表都小于阈值,且A表的字节数小于B表时,则运行B join A,如

    SELECT A.name FROM B JOIN A ON A.name = B.name;

    否则运行A join B。

    SELECT A.name FROM A JOIN B ON A.name = B.name;