更新时间:2022-09-30 GMT+08:00
分享

Spark SQL join优化

操作场景

Spark SQL中,当对两个表进行join操作时,利用Broadcast特性(见“使用广播变量”章节),将被广播的表BroadCast到各个节点上,从而转变成非shuffle操作,提高任务执行性能。

这里join操作,只指inner join。

操作步骤

在Spark SQL中进行Join操作时,可以按照以下步骤进行优化。为了方便说明,设表A和表B,且A、B表都有个名为name的列。对A、B表进行join操作。

  1. 估计表的大小。

    根据每次加载数据的大小,来估计表大小。

    也可以在Hive的数据库存储路径下直接查看表的大小。首先在Spark的配置文件“hive-site.xml”中,查看Hive的数据库路径的配置,默认为“/user/hive/warehouse”。Spark服务多实例默认数据库路径为“/user/hive/warehouse”,例如“/user/hive1/warehouse”

    <property>
       <name>hive.metastore.warehouse.dir</name>
      <value>${test.warehouse.dir}</value>
      <description></description>
    </property>

    然后通过hadoop命令查看对应表的大小。如查看表A的大小命令为:

    hadoop fs -du -s -h ${test.warehouse.dir}/a

    进行广播操作,需要至少有一个表不是空表。

  2. 配置自动广播的阈值。

    Spark中,判断表是否广播的阈值为10485760(即10M)。如果两个表的大小至少有一个小于10M时,可以跳过该步骤。

    自动广播阈值的配置参数介绍,见表1

    表1 参数介绍

    参数

    默认值

    描述

    spark.sql.autoBroadcastJoinThreshold

    10485760

    当进行join操作时,配置广播的最大值。

    • 当SQL语句中涉及的表中相应字段的大小小于该值时,进行广播。
    • 配置为-1时,将不进行广播。

    参见https://spark.apache.org/docs/3.1.1/sql-programming-guide.html

    配置自动广播阈值的方法:

    • 在Spark的配置文件“spark-defaults.conf”中,设置“spark.sql.autoBroadcastJoinThreshold”的值。
      spark.sql.autoBroadcastJoinThreshold = <size>
    • 利用Hive CLI命令,设置阈值。在运行Join操作时,提前运行下面语句:
      SET spark.sql.autoBroadcastJoinThreshold=<size>;
  3. 进行join操作。
    • 两个表的大小都小于阈值。
      • A表的字节数小于B表,则运行B join A,如
        SELECT A.name FROM B JOIN A ON A.name = B.name;
      • 否则运行A join B。
        SELECT A.name FROM A JOIN B ON A.name = B.name;
    • 一个表大于阈值一个表小于阈值。

      将小表进行BroadCast操作。

    • 两个表的大小都大于阈值。

      比较查询所涉及的字段大小与阈值的大小。

      • 若某表中涉及字段的大小小于阈值,将该表相应数据进行广播。
      • 若两表中涉及字段的大小都大于阈值,则不进行广播。
  4. (可选)如下两种场景,需要执行Analyze命令(ANALYZE TABLE tableName COMPUTE STATISTICS noscan;)更新表元数据后进行广播。
    • 需要广播的表是分区表,新建表且文件类型为非Parquet文件类型。
    • 需要广播的表是分区表,更新表数据后。

参考信息

被广播的表执行超时,导致任务结束。

默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。

这种情况下,有两种方式处理:

  • 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。
  • 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。
分享:

    相关文档

    相关产品