更新时间:2022-08-16 GMT+08:00

数据倾斜调优

数据倾斜问题是分布式架构的重要难题,它破坏了MPP架构中各个节点对等的要求,导致单节点(倾斜节点)所存储或者计算的数据量远大于其他节点,所以会造成以下危害:

  • 存储上的倾斜会严重限制系统容量,在系统容量不饱和的情况下,由于单节点倾斜的限制,使得整个系统容量无法继续增长。
  • 计算上的倾斜会严重影响系统性能,由于倾斜节点所需要运算的数据量远大于其它节点,导致倾斜节点降低系统整体性能。
  • 数据倾斜还严重影响了MPP架构的扩展性。由于在存储或者计算时,往往会将相同值的数据放到同一节点,因此当倾斜数据(大量数据的值相同)出现之后,即使我们增加节点,系统瓶颈仍然受限于倾斜节点的容量或者性能。

GaussDB(DWS)数据库针对数据倾斜问题给出了完整的解决方案,包括存储倾斜和计算倾斜两大问题,下面分别进行介绍。

存储层数据倾斜

GaussDB(DWS)数据库中,数据分布存储在各个DN上,通过分布式执行提高查询的效率。但是,如果数据分布存在倾斜,则会导致分布式执行某些DN成为瓶颈,影响查询性能。这种情况通常是由于分布列选择不合理,可以通过调整分布列的方式解决。

例如下例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
explain performance select count(*) from inventory;
5 --CStore Scan on lmz.inventory
         dn_6001_6002 (actual time=0.444..83.127 rows=42000000 loops=1)
         dn_6003_6004 (actual time=0.512..63.554 rows=27000000 loops=1)
         dn_6005_6006 (actual time=0.722..99.033 rows=45000000 loops=1)
         dn_6007_6008 (actual time=0.529..100.379 rows=51000000 loops=1)
         dn_6009_6010 (actual time=0.382..71.341 rows=36000000 loops=1)
         dn_6011_6012 (actual time=0.547..100.274 rows=51000000 loops=1)
         dn_6013_6014 (actual time=0.596..118.289 rows=60000000 loops=1)
         dn_6015_6016 (actual time=1.057..132.346 rows=63000000 loops=1)
         dn_6017_6018 (actual time=0.940..110.310 rows=54000000 loops=1)
         dn_6019_6020 (actual time=0.231..41.198 rows=21000000 loops=1)
         dn_6021_6022 (actual time=0.927..114.538 rows=54000000 loops=1)
         dn_6023_6024 (actual time=0.637..118.385 rows=60000000 loops=1)
         dn_6025_6026 (actual time=0.288..32.240 rows=15000000 loops=1)
         dn_6027_6028 (actual time=0.566..118.096 rows=60000000 loops=1)
         dn_6029_6030 (actual time=0.423..82.913 rows=42000000 loops=1)
         dn_6031_6032 (actual time=0.395..78.103 rows=39000000 loops=1)
         dn_6033_6034 (actual time=0.376..51.052 rows=24000000 loops=1)
         dn_6035_6036 (actual time=0.569..79.463 rows=39000000 loops=1)

在performance信息中,可以看到inventory表各DN的scan行数,发现各DN的行数差距较大,最大的为63000000,最小的只有15000000,差了4倍。这个差距对于数据扫描的性能影响还可以接受,但如果上层有join算子,则影响较大。

通常,数据表在各DN上是hash分布的,因此分布列的选择很重要。通过table_skewness()来查看上述inventory表在各DN的数据分布倾斜,查询结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
select table_skewness('inventory');
              table_skewness              
------------------------------------------
 ("dn_6015_6016        ",63000000,8.046%)
 ("dn_6013_6014        ",60000000,7.663%)
 ("dn_6023_6024        ",60000000,7.663%)
 ("dn_6027_6028        ",60000000,7.663%)
 ("dn_6017_6018        ",54000000,6.897%)
 ("dn_6021_6022        ",54000000,6.897%)
 ("dn_6007_6008        ",51000000,6.513%)
 ("dn_6011_6012        ",51000000,6.513%)
 ("dn_6005_6006        ",45000000,5.747%)
 ("dn_6001_6002        ",42000000,5.364%)
 ("dn_6029_6030        ",42000000,5.364%)
 ("dn_6031_6032        ",39000000,4.981%)
 ("dn_6035_6036        ",39000000,4.981%)
 ("dn_6009_6010        ",36000000,4.598%)
 ("dn_6003_6004        ",27000000,3.448%)
 ("dn_6033_6034        ",24000000,3.065%)
 ("dn_6019_6020        ",21000000,2.682%)
 ("dn_6025_6026        ",15000000,1.916%)
(18 rows)

通过查询建表定义,可以发现,目前该表是以inv_date_sk作为分布列的,导致存在倾斜。通过查看各列的数据分布情况,改为inv_item_sk作为分布列,则倾斜情况分布如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
select table_skewness('inventory');
              table_skewness              
------------------------------------------
 ("dn_6001_6002        ",43934200,5.611%)
 ("dn_6007_6008        ",43829420,5.598%)
 ("dn_6003_6004        ",43781960,5.592%)
 ("dn_6031_6032        ",43773880,5.591%)
 ("dn_6033_6034        ",43763280,5.589%)
 ("dn_6011_6012        ",43683600,5.579%)
 ("dn_6013_6014        ",43551660,5.562%)
 ("dn_6027_6028        ",43546340,5.561%)
 ("dn_6009_6010        ",43508700,5.557%)
 ("dn_6023_6024        ",43484540,5.554%)
 ("dn_6019_6020        ",43466800,5.551%)
 ("dn_6021_6022        ",43458500,5.550%)
 ("dn_6017_6018        ",43448040,5.549%)
 ("dn_6015_6016        ",43247700,5.523%)
 ("dn_6005_6006        ",43200240,5.517%)
 ("dn_6029_6030        ",43181360,5.515%)
 ("dn_6025_6026        ",43179700,5.515%)
 ("dn_6035_6036        ",42960080,5.487%)
(18 rows)

数据分布倾斜的问题得到解决。

除了table_skewness()视图外,当前版本还提供了table_distribution函数和PGXC_GET_TABLE_SKEWNESS视图,可以更加高效的查询各表的数据倾斜情况。

计算层数据倾斜

即使通过修改表的分布键,使得数据存储在各个节点上是均衡的,但是在执行查询的过程中,仍然可能出现数据倾斜的问题。在运算过程中某个算子在DN上输出的结果集出现倾斜,从而导致此算子上层的运算出现计算倾斜。一般来说,这是由于在执行过程中,数据重分布导致的。

在查询执行的过程中,join key、group by key等往往不是表的分布列,因此需要按照join key、group by key上数据的hash值,让数据在各个DN之间进行重新分布,这个过程对应于计划中的Redistribute算子。当重分布列上的数据存在倾斜时,就会导致运行时的数据倾斜,即重分布后部分节点的数据远远大于其他。倾斜节点需要处理更多的数据,导致倾斜节点的计算性能远远低于其他节点。

如下例中,s表和t表join,join条件中的s.x和t.x均不是表的分布列,因此需要重分布(REDISTRIBUTE算子)。其中s.x列上存在倾斜值,t.x上不存在倾斜。id=6的stream算子在datanode2节点输出的结果集是其他DN的3倍,从而导致了计算倾斜。

1
select * from skew s,test t where s.x = t.x order by s.a limit 1;
 id |                      operation                      |        A-time         
----+-----------------------------------------------------+-----------------------
  1 | ->  Limit                                           | 52622.382             
  2 |    ->  Streaming (type: GATHER)                     | 52622.374             
  3 |       ->  Limit                                     | [30138.494,52598.994] 
  4 |          ->  Sort                                   | [30138.486,52598.986] 
  5 |             ->  Hash Join (6,8)                     | [30127.013,41483.275] 
  6 |                ->  Streaming(type: REDISTRIBUTE)    | [11365.110,22024.845] 
  7 |                   ->  Seq Scan on public.skew s     | [2019.168,2175.369]   
  8 |                ->  Hash                             | [2460.108,2499.850]   
  9 |                   ->  Streaming(type: REDISTRIBUTE) | [1056.214,1121.887]   
 10 |                      ->  Seq Scan on public.test t  | [310.848,325.569]     
 
6 --Streaming(type: REDISTRIBUTE)
         datanode1 (rows=5050368)
         datanode2 (rows=15276032)
         datanode3 (rows=5174272)
         datanode4 (rows=5219328)

和存储倾斜相比,计算倾斜更难以提前识别,因此GaussDB提出了RLBT(Runtime Load Balance Technology)方案,用以解决运行时的计算倾斜问题,该特性由参数skew_option控制。RLBT方案主要分为两个层面,第一步是计算倾斜识别,第二步是计算倾斜解决。下面分别进行介绍。

  1. 倾斜识别

    计算倾斜的识别,即预先识别计算过程中的重分布列是否存在倾斜数据。RLBT方案中给出了三个解决手段,统计信息识别,hint方式指定以及规则识别:

    • 统计信息识别

      需要用户先执行analyze收集各表的统计信息,然后优化器能够自动利用统计信息对重分布键上的倾斜数据进行提前识别,对于存在倾斜的查询,生成相应的优化计划。在重分布键有多列的情况,只有所有列都属于同一个基表才能利用统计信息进行识别。

      统计信息只能给出基表的倾斜情况,当基表某一列存在倾斜,其他列上带有过滤条件,或者经过和其他表的join之后,我们无法准确判断倾斜列上倾斜数据是否依旧存在。当skew_option为normal时,这里认为倾斜数据依旧存在,仍然会对基表中识别到的倾斜进行优化;当skew_option为lazy时,这里认为倾斜数据已经不再存在,也就不会进行相应的优化。

    • hint方式指定

      统计信息有着一定的局限性,对于较为复杂的查询,其中间结果难以通过统计信息进行估算和识别倾斜数据。对于这种情况,我们设计了hint手段,通过用户手动指定的方式,给定倾斜信息。优化器根据用户给定的倾斜信息,来对查询进行优化。详细hint使用语法参见运行倾斜的hint

    • 规则识别

      现在BI系统往往会产生大量带有outer join(left join、right join、full join)的SQL,outer join在匹配失败的情况下会补空产生大量NULL值,如果接下来在补空列上进行join或者group by操作,就会导致NULL值倾斜。当前RLBT技术会自动识别这种场景,并生成相应的NULL值倾斜优化计划。

  2. 计算倾斜解决
    在解决倾斜时,目前针对最常见的join和agg算子进行了优化。
    • join优化

    基本思路是将倾斜数据和非倾斜数据进行隔离处理。主要分为以下三种情况:

    1. join两侧都需要做重分布:

      对倾斜侧做PART_REDISTRIBUTE_PART_ROUNDROBIN,其中对倾斜数据做roundrobin,非倾斜数据做redistribute;

      对非倾斜侧做PART_REDISTRIBUTE_PART_BROADCAST,其中对倾斜数据做broadcast,非倾斜数据做redistribute;

    2. join一侧需要重分布,另一侧不需要重分布:

      对需要重分布的一侧做PART_REDISTRIBUTE_PART_ROUNDROBIN;

      对不需要重分布的一侧做PART_LOCAL_PART_BROADCAST,其中对等于倾斜值的部分做broadcast,其余数据保留在本地。

    3. 对于有补NULL值的表:

      对该表做PART_REDISTERIBUTE_PART_LOCAL,其中将NULL值保留在本地,其余数据做redistribute。

    以前面的查询为例,s.x列上存在倾斜数据,倾斜数据的值为0。优化器通过统计信息,识别到了该倾斜数据,生成了倾斜优化计划如下:

     id |                                operation                                |        A-time         
    ----+-------------------------------------------------------------------------+-----------------------
      1 | ->  Limit                                                               | 23642.049             
      2 |    ->  Streaming (type: GATHER)                                         | 23642.041             
      3 |       ->  Limit                                                         | [23310.768,23618.021] 
      4 |          ->  Sort                                                       | [23310.761,23618.012] 
      5 |             ->  Hash Join (6,8)                                         | [20898.341,21115.272] 
      6 |                ->  Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN)   | [7125.834,7472.111]   
      7 |                   ->  Seq Scan on public.skew s                         | [1837.079,1911.025]   
      8 |                ->  Hash                                                 | [2612.484,2640.572]   
      9 |                   ->  Streaming(type: PART REDISTRIBUTE PART BROADCAST) | [1193.548,1297.894]   
     10 |                      ->  Seq Scan on public.test t                      | [314.343,328.707]     
    
       5 --Vector Hash Join (6,8)
             Hash Cond: s.x = t.x
             Skew Join Optimizated by Statistic
       6 --Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN)
             datanode1 (rows=7635968)
             datanode2 (rows=7517184)
             datanode3 (rows=7748608)
             datanode4 (rows=7818240)

    上述执行计划中,可以看到Skew Join Optimizated by Statistic的字样,代表该计划为倾斜优化计划,其中Statistic关键字代表该倾斜优化来自于统计信息,除此之外还有Hint和Rule,分别代表倾斜优化来自于hint语句和规则。对比前面的计划可以看到,这里对于非倾斜数据和倾斜数据做了分别处理。对于s表中的非倾斜数据,依旧按照原有的方案,根据数据的hash值进行重分布;而对于倾斜数据(即等于0的数据),则通过轮询发送的方式,均衡地发送到所有节点。通过这样的方式,解决了倾斜数据分布不均衡的问题。

    同时,为了保证结果的正确性,需要对t表做相应的处理。对于t表中等于0(s.x表中的倾斜值)的数据做广播,对于其他数据,依旧根据数据的hash值进行重分布。

    通过这样的方式,就解决了join操作中,数据倾斜的问题。从上面的结果来看,id=6的stream算子各个DN的输出结果已经非常均衡,同时查询端到端性能提升了1倍。

    如果执行计划中Stream算子类型显示为HYBRID,则表示对不同的倾斜数据所应用的stream方式不同,例如以下计划:

    EXPLAIN (nodes OFF, costs OFF) SELECT COUNT(*) FROM skew_scol s, skew_scol1 s1 WHERE s.b = s1.c;
    QUERY PLAN
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id |                                                                         operation
    ----+-----------------------------------------------------------------------------------------------------------------------------------------------------------
    1 | ->  Aggregate
    2 |    ->  Streaming (type: GATHER)
    3 |       ->  Aggregate
    4 |          ->  Hash Join (5,7)
    5 |             ->  Streaming(type: HYBRID)
    6 |                ->  Seq Scan on skew_scol s
    7 |             ->  Hash
    8 |                ->  Streaming(type: HYBRID)
    9 |                   ->  Seq Scan on skew_scol1 s1
    
    Predicate Information (identified by plan id)
    --------------------------------------------------------------------------------------------------------------------------------------------
    4 --Hash Join (5,7)
    Hash Cond: (s.b = s1.c)
    Skew Join Optimized by Statistic
    5 --Streaming(type: HYBRID)
    Skew Filter: (b = 1)
    Skew Filter: (b = 0)
    8 --Streaming(type: HYBRID)
    Skew Filter: (c = 0)
    Skew Filter: (c = 1)

    数据1在skew_scol表上有倾斜,对倾斜数据做roundrobin,非倾斜数据做redistribute。

    数据0在skew_scol表上是非倾斜侧,对倾斜数据做broadcast,非倾斜数据做redistribute。

    由此可以看到两个stream类型分别是PART REDISTRIBUTE PART ROUNDROBIN和PART REDISTRIBUTE PART BROADCAST,这里标记stream类型为HYBRID类型。

    • agg优化

    对于agg操作,解决倾斜的思路与join操作不同,这里是通过首先在本DN内按照group by key进行去重操作,然后再进行重分布。因为经过DN内部去重之后,从全局来看,每个值的数量都不会超过DN数,因此不会出现严重的数据倾斜问题。以如下query为例:

    1
    select c1, c2, c3, c4, c5, c6, c7, c8, c9, count(*) from t group by c1, c2, c3, c4, c5, c6, c7, c8, c9 limit 10;
    

    原执行结果如下:

     id |                 operation                  |         A-time         |  A-rows  
    ----+--------------------------------------------+------------------------+----------
      1 | ->  Streaming (type: GATHER)               | 130621.783             |       12 
      2 |    ->  GroupAggregate                      | [85499.711,130432.341] |       12 
      3 |       ->  Sort                             | [85499.509,103145.632] | 36679237 
      4 |          ->  Streaming(type: REDISTRIBUTE) | [25668.897,85499.050]  | 36679237 
      5 |             ->  Seq Scan on public.t       | [9835.069,10416.388]   | 36679237 
    
       4 --Streaming(type: REDISTRIBUTE)
             datanode1 (rows=36678837)
             datanode2 (rows=100)
             datanode3 (rows=100)
             datanode4 (rows=200)

    其中存在大量倾斜数据,导致数据按照group by key进行重分布之后,datanode1的数据量是其他节点的数十万倍。在倾斜优化之后,首先在本DN进行一次group by操作,达到数据去重的效果,然后再进行重分布,可以发现基本没有数据倾斜的问题出现。

     id |                 operation                  |        A-time          
    ----+--------------------------------------------+-----------------------
      1 | ->  Streaming (type: GATHER)               | 10961.337             
      2 |    ->  HashAggregate                       | [10953.014,10953.705] 
      3 |       ->  HashAggregate                    | [10952.957,10953.632] 
      4 |          ->  Streaming(type: REDISTRIBUTE) | [10952.859,10953.502] 
      5 |             ->  HashAggregate              | [10084.280,10947.139] 
      6 |                ->  Seq Scan on public.t    | [4757.031,5201.168]   
    
     Predicate Information (identified by plan id) 
    -----------------------------------------------
       3 --HashAggregate
             Skew Agg Optimized by Statistic
    
       4 --Streaming(type: REDISTRIBUTE)
             datanode1 (rows=17)
             datanode2 (rows=8)
             datanode3 (rows=8)
             datanode4 (rows=14)

    适用范围

    • join算子
      • 支持nest loop,merge join,hash join等join方式;
      • 当倾斜数据处于join的left侧时,支持inner join,left join,semi join,anti join;当倾斜属于位于join的right侧时,支持inner join,right join,right semi join,right anti join。
      • 通过统计信息得到的倾斜优化计划,优化器会根据代价判断该计划是否为最优计划。通过hint和规则会强制生成倾斜优化计划。
    • agg算子
      • array_agg、string_agg、subplan in agg qual这几种场景不支持优化;
      • 通过统计信息识别到的倾斜优化计划会受到代价、plan_mode_seed参数、best_agg_plan参数影响,而hint、规则识别到的不会。