Updated on 2022-06-01 GMT+08:00

Spark CBO Tuning

Scenario

An SQL query compiler is responsible for converting SQL statements into execution plans, while an optimizer instructs the SQL query compiler to select the most efficient execution plan. Traditional databases (for example, Oracle) support two types of optimizers: Rule-Based Optimization (RBO) and Cost-Based Optimization (CBO).
  • RBO

    Rules of RBO are formed based on experience. Execution plans for SQL statements following the RBO rules are not affected by contents or data distribution in tables.

  • CBO

    Rules of CBO are determined by data distribution and organization. The cost of each execution plan is evaluated and the plan with the lowest cost is selected.

Currently, all Spark optimizers are RBO-based and have dozens of optimization rules, for example, predicate pushdown, constant folding, and projection tailoring. These rules are valid but insensitive to data. When data distribution in a table changes, RBO is not aware of the changes and the execution plan generated by RBO is not the optimal. In comparison, CBO calculates SQL statements based on actual data distribution. It generates a group of execution plans and selects the one with the lowest cost to improve performance.

Join algorithm selection is a major improvement in CBO compared with RBO. For example, when two tables are joined, if the result set of a large table is smaller than the threshold of BroadCast after the filter operation is performed, without CBO, the changes cannot be detected and the SortMergeJoin algorithm is used, which involves a large number of shuffle operations and deteriorates performance. However, with CBO, the changes can be detected and the BroadcastHashJoin algorithm is used to broadcast the small tables to every node. This involves no shuffle operation and greatly improves performance.

Procedure

Based on table and column statistics, Spark CBO calculates the sizes of intermediate result sets generated by operators and then selects the optimal execution plan according to the calculation result.

  1. Configure parameters.
    • Add the spark.sql.cbo configuration item to the spark-defaults.conf configuration file and set it to true. The default value is false.
    • Run the set spark.sql.cbo=true SQL statement on the client.
  2. Run commands to obtain the statistics.

    Perform this step once before running all SQL statements. If a data set changes (added, updated, or deleted), you must run the commands again to obtain the latest statistics and data distribution information to ensure CBO optimization effects.

    • For tables, run the COMPUTE STATS FOR TABLE src command to obtain the table statistics, including the number of records, number of files, and physical storage capacity.
    • For columns:
      • Run the COMPUTE STATS FOR TABLE src ON COLUMNS command to obtain the statistics of all columns.
      • Run the COMPUTE STATS FOR TABLE src ON COLUMNS name,age command to obtain the statistics of the name and age fields.

        Four types of column statistics are supported: number, date, time, and character string. The number, data, and time statistics consist of the maximum value, minimum value, number of distinct values, number of null values, and histogram (equi-width or equi-height histogram). The character string statistics consist of the maximum value, minimum value, maximum length, average length, number of distinct values, number of null values, and histogram (equi-width histogram only).

  3. Optimize CBO.
    • Automatic optimization: The system determines whether the input SQL statements can be optimized and automatically selects the optimization algorithm.
    • Manual optimization: You can run the DESC FORMATTED src command to view the statistics and then manually optimize the SQL statements based on data distribution.