数据倾斜调优
数据倾斜问题是分布式架构的重要难题,它破坏了MPP架构中各个节点对等的要求,导致单节点(倾斜节点)所存储或者计算的数据量远大于其他节点,所以会造成以下危害:
- 存储上的倾斜会严重限制系统容量,在系统容量不饱和的情况下,由于单节点倾斜的限制,使得整个系统容量无法继续增长。
- 计算上的倾斜会严重影响系统性能,由于倾斜节点所需要运算的数据量远大于其它节点,导致倾斜节点降低系统整体性能。
- 数据倾斜还严重影响了MPP架构的扩展性。由于在存储或者计算时,会将相同值的数据放到同一节点,因此当倾斜数据(大量数据的值相同)出现之后,即使增加节点,系统瓶颈仍然受限于倾斜节点的容量或者性能。
DWS数据库针对数据倾斜问题给出了完整的解决方案,包括存储倾斜和计算倾斜两大问题,下面分别进行介绍。
存储层数据倾斜
DWS数据库中,数据分布存储在各个DN上,通过分布式执行提高查询的效率。但是,如果数据分布存在倾斜,则会导致分布式执行某些DN成为瓶颈,影响查询性能。这种情况通常是由于分布列选择不合理,可以通过调整分布列的方式解决。
分析过程:
- 登录DWS控制台。在“集群列表”页面,找到需要查看监控的集群。在指定集群所在行的“操作”列,单击“监控面板”。选择“监控 > 节点监控 > 磁盘”,查看磁盘使用率。
各个数据磁盘的利用率,会有不均衡的现象。正常情况下,利用率最高和利用率最低的磁盘空间相差不大,如果磁盘利用率相差超过了5%就要注意是不是有资源倾斜的情况。
- 连接数据库,通过等待视图查看作业的运行情况,发现作业总是等待部分DN或者个别DN。
1
SELECT wait_status, count(*) as cnt FROM pgxc_thread_wait_status WHERE wait_status not like '%cmd%' AND wait_status not like '%none%' and wait_status not like '%quit%' group by 1 order by 2 desc;
- 执行慢语句的explain performance显示,发现各个DN的基表scan的时间和行数不均衡。
1
explain performance select avg(ss_wholesale_cost) from store_sales;
- 基表scan的时间:最快的DN耗时5ms,最慢的DN耗时1173ms。
- 数据分布情况:某些DN有22831616行,其他DN都是0行,数据有严重倾斜。
- 通过倾斜检查接口可以发现数据倾斜。
1
SELECT table_skewness('store_sales');
1
SELECT table_distribution('public','store_sales');
处理方法--如何找到倾斜的表:
- 数据库中表个数少于1W的场景下,可直接使用倾斜视图查询当前库内所有表的数据倾斜情况。
1
SELECT * FROM pgxc_get_table_skewness ORDER BY totalsize DESC;
- 数据库中表个数非常多(至少大于1W)的场景,因PGXC_GET_TABLE_SKEWNESS视图涉及全库查并计算非常全面的倾斜字段,所以可能会花费比较长的时间(小时级),建议参考PGXC_GET_TABLE_SKEWNESS视图定义,执行以下操作:
- 8.1.2及之前集群版本中使用table_distribution()函数自定义输出,减少输出列进行计算优化,例如:
1 2 3 4 5 6
SELECT schemaname,tablename,max(dnsize) AS maxsize, min(dnsize) AS minsize FROM pg_catalog.pg_class c INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace INNER JOIN pg_catalog.table_distribution() s ON s.schemaname = n.nspname AND s.tablename = c.relname INNER JOIN pg_catalog.pgxc_class x ON c.oid = x.pcrelid AND x.pclocatortype = 'H' GROUP BY schemaname,tablename;
- 8.1.3及以上集群版本中支持使用gs_table_distribution()函数,全库查询所有表的数据倾斜情况。全库表查询时,gs_table_distribution()函数优于table_distribution()函数;在大集群大数据量场景下,如果进行全库表查询,建议优先使用gs_table_distribution()函数。
1 2 3 4 5 6
SELECT schemaname,tablename,max(dnsize) AS maxsize, min(dnsize) AS minsize FROM pg_catalog.pg_class c INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace INNER JOIN pg_catalog.gs_table_distribution() s ON s.schemaname = n.nspname AND s.tablename = c.relname INNER JOIN pg_catalog.pgxc_class x ON c.oid = x.pcrelid AND x.pclocatortype = 'H' GROUP BY schemaname,tablename;
使用如下语句可快速查询到大表:
1
SELECT schemaname||'.'||tablename as table, sum(dnsize) as size FROM gs_table_distribution() group by 1 order by 2 desc limit 10;
使用如下语句可快速查询表的倾斜率:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
WITH skew AS ( SELECT schemaname, tablename, pg_catalog.sum(dnsize) AS totalsize, pg_catalog.avg(dnsize) AS avgsize, pg_catalog.max(dnsize) AS maxsize, pg_catalog.min(dnsize) AS minsize, (pg_catalog.max(dnsize) - pg_catalog.min(dnsize)) AS skewsize, pg_catalog.stddev(dnsize) AS skewstddev FROM pg_catalog.pg_class c INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace INNER JOIN pg_catalog.gs_table_distribution() s ON s.schemaname = n.nspname AND s.tablename = c.relname INNER JOIN pg_catalog.pgxc_class x ON c.oid = x.pcrelid AND x.pclocatortype IN('H', 'N') GROUP BY schemaname,tablename ) SELECT schemaname, tablename, totalsize, avgsize::numeric(1000), (maxsize/totalsize)::numeric(4,3) AS maxratio, (minsize/totalsize)::numeric(4,3) AS minratio, skewsize, (skewsize/avgsize)::numeric(4,3) AS skewratio, skewstddev::numeric(1000) FROM skew WHERE totalsize > 0;
- 8.1.2及之前集群版本中使用table_distribution()函数自定义输出,减少输出列进行计算优化,例如:
处理方法--表的分布键的选择方法:
- 如果此列的distinct值比较大,并且没有明显的数据倾斜,也可以把多列定义成分布列。
查看数据是否存在倾斜:
1
SELECT count(*) cnt, column1 FROM table group by column1 order by cnt limit 100;
- 选用经常做JOIN或group by的列,可以减少STREAM运算。
- 不推荐以下分布键选择方式:
- 分布列用默认值(第一列)。
- 分布列用sequence自增生成。
- 分布列用随机数生成(除非任意列,或者任意两列的组合做分布键都是倾斜的,一般不选用这种方法)。
计算层数据倾斜
即使通过修改表的分布键,使得数据存储在各个节点上是均衡的,但是在执行查询的过程中,仍然可能出现数据倾斜的问题。在运算过程中某个算子在DN上输出的结果集出现倾斜,从而导致此算子上层的运算出现计算倾斜。一般来说,这是由于在执行过程中,数据重分布导致的。
在查询执行的过程中,join key、group by key等不是表的分布列,因此需要按照join key、group by key上数据的hash值,让数据在各个DN之间进行重新分布,这个过程对应于计划中的Redistribute算子。当重分布列上的数据存在倾斜时,就会导致运行时的数据倾斜,即重分布后部分节点的数据远大于其他。倾斜节点需要处理更多的数据,导致倾斜节点的计算性能远低于其他节点。
如下例中,s表和t表join,join条件中的s.x和t.x均不是表的分布列,因此需要重分布(REDISTRIBUTE算子)。其中s.x列上存在倾斜值,t.x上不存在倾斜。id=6的stream算子在datanode2节点输出的结果集是其他DN的3倍,从而导致了计算倾斜。
1
|
select * from skew s,test t where s.x = t.x order by s.a limit 1; |
id | operation | A-time ----+-----------------------------------------------------+----------------------- 1 | -> Limit | 52622.382 2 | -> Streaming (type: GATHER) | 52622.374 3 | -> Limit | [30138.494,52598.994] 4 | -> Sort | [30138.486,52598.986] 5 | -> Hash Join (6,8) | [30127.013,41483.275] 6 | -> Streaming(type: REDISTRIBUTE) | [11365.110,22024.845] 7 | -> Seq Scan on public.skew s | [2019.168,2175.369] 8 | -> Hash | [2460.108,2499.850] 9 | -> Streaming(type: REDISTRIBUTE) | [1056.214,1121.887] 10 | -> Seq Scan on public.test t | [310.848,325.569] 6 --Streaming(type: REDISTRIBUTE) datanode1 (rows=5050368) datanode2 (rows=15276032) datanode3 (rows=5174272) datanode4 (rows=5219328)
和存储倾斜相比,计算倾斜更难以提前识别,因此DWS提出了RLBT(Runtime Load Balance Technology)方案,用于解决运行时的计算倾斜问题,该特性由参数skew_option控制。RLBT方案主要分为两个层面,第一步是计算倾斜识别,第二步是计算倾斜解决。下面分别进行介绍。
- 倾斜识别
计算倾斜的识别,即预先识别计算过程中的重分布列是否存在倾斜数据。RLBT方案中给出了三个解决手段,统计信息识别,hint方式指定以及规则识别:
- 统计信息识别
需要用户先执行ANALYZE收集各表的统计信息,然后优化器能够自动利用统计信息对重分布键上的倾斜数据进行提前识别,对于存在倾斜的查询,生成相应的优化计划。在重分布键有多列的情况,只有所有列都属于同一个基表才能利用统计信息进行识别。
统计信息只能给出基表的倾斜情况,当基表某一列存在倾斜,其他列上带有过滤条件,或者经过和其他表的join之后,无法准确判断倾斜列上倾斜数据是否依旧存在。当skew_option为normal时,这里认为倾斜数据依旧存在,仍然会对基表中识别到的倾斜进行优化;当skew_option为lazy时,这里认为倾斜数据已经不再存在,也就不会进行相应的优化。
- hint方式指定
统计信息有着一定的局限性,对于较为复杂的查询,其中间结果难以通过统计信息进行估算和识别倾斜数据。对于这种情况,DWS设计了hint手段,通过用户手动指定的方式,给定倾斜信息。优化器根据用户给定的倾斜信息,来对查询进行优化。详细hint使用语法参见运行倾斜的hint。
- 规则识别
现在BI系统会产生大量带有outer join(left join、right join、full join)的SQL,outer join在匹配失败的情况下会补空产生大量NULL值,如果接下来在补空列上进行join或者group by操作,就会导致NULL值倾斜。当前RLBT技术会自动识别这种场景,并生成相应的NULL值倾斜优化计划。
- 统计信息识别
- 计算倾斜解决
基本思路是将倾斜数据和非倾斜数据进行隔离处理。主要分为以下三种情况:
- join两侧都需要做重分布:
对倾斜侧做PART_REDISTRIBUTE_PART_ROUNDROBIN,其中对倾斜数据做roundrobin,非倾斜数据做redistribute;
对非倾斜侧做PART_REDISTRIBUTE_PART_BROADCAST,其中对倾斜数据做broadcast,非倾斜数据做redistribute;
- join一侧需要重分布,另一侧不需要重分布:
对需要重分布的一侧做PART_REDISTRIBUTE_PART_ROUNDROBIN;
对不需要重分布的一侧做PART_LOCAL_PART_BROADCAST,其中对等于倾斜值的部分做broadcast,其余数据保留在本地。
- 对于有补NULL值的表:
对该表做PART_REDISTRIBUTE_PART_LOCAL,其中将NULL值保留在本地,其余数据做redistribute。
以前面的查询为例,s.x列上存在倾斜数据,倾斜数据的值为0。优化器通过统计信息,识别到了该倾斜数据,生成了倾斜优化计划如下:
id | operation | A-time ----+-------------------------------------------------------------------------+----------------------- 1 | -> Limit | 23642.049 2 | -> Streaming (type: GATHER) | 23642.041 3 | -> Limit | [23310.768,23618.021] 4 | -> Sort | [23310.761,23618.012] 5 | -> Hash Join (6,8) | [20898.341,21115.272] 6 | -> Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN) | [7125.834,7472.111] 7 | -> Seq Scan on public.skew s | [1837.079,1911.025] 8 | -> Hash | [2612.484,2640.572] 9 | -> Streaming(type: PART REDISTRIBUTE PART BROADCAST) | [1193.548,1297.894] 10 | -> Seq Scan on public.test t | [314.343,328.707] 5 --Vector Hash Join (6,8) Hash Cond: s.x = t.x Skew Join Optimized by Statistic 6 --Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN) datanode1 (rows=7635968) datanode2 (rows=7517184) datanode3 (rows=7748608) datanode4 (rows=7818240)
上述执行计划中,可以看到Skew Join Optimized by Statistic的字样,代表该计划为倾斜优化计划,其中Statistic关键字代表该倾斜优化来自于统计信息,除此之外还有Hint和Rule,分别代表倾斜优化来自于hint语句和规则。对比前面的计划可以看到,这里对于非倾斜数据和倾斜数据做了分别处理。对于s表中的非倾斜数据,依旧按照原有的方案,根据数据的hash值进行重分布;而对于倾斜数据(即等于0的数据),则通过轮询发送的方式,均衡地发送到所有节点。通过这样的方式,解决了倾斜数据分布不均衡的问题。
同时,为了保证结果的正确性,需要对t表做相应的处理。对于t表中等于0(s.x表中的倾斜值)的数据做广播,对于其他数据,依旧根据数据的hash值进行重分布。
通过这样的方式,就解决了join操作中,数据倾斜的问题。从上面的结果来看,id=6的stream算子各个DN的输出结果已经非常均衡,同时查询端到端性能提升了1倍。
如果执行计划中Stream算子类型显示为HYBRID,则表示对不同的倾斜数据所应用的stream方式不同,例如以下计划:
EXPLAIN (nodes OFF, costs OFF) SELECT COUNT(*) FROM skew_scol s, skew_scol1 s1 WHERE s.b = s1.c; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------------ id | operation ----+----------------------------------------------------------------------------------------------------------------------------------------------------------- 1 | -> Aggregate 2 | -> Streaming (type: GATHER) 3 | -> Aggregate 4 | -> Hash Join (5,7) 5 | -> Streaming(type: HYBRID) 6 | -> Seq Scan on skew_scol s 7 | -> Hash 8 | -> Streaming(type: HYBRID) 9 | -> Seq Scan on skew_scol1 s1 Predicate Information (identified by plan id) -------------------------------------------------------------------------------------------------------------------------------------------- 4 --Hash Join (5,7) Hash Cond: (s.b = s1.c) Skew Join Optimized by Statistic 5 --Streaming(type: HYBRID) Skew Filter: (b = 1) Skew Filter: (b = 0) 8 --Streaming(type: HYBRID) Skew Filter: (c = 0) Skew Filter: (c = 1)
数据1在skew_scol表上有倾斜,对倾斜数据做roundrobin,非倾斜数据做redistribute。
数据0在skew_scol表上是非倾斜侧,对倾斜数据做broadcast,非倾斜数据做redistribute。
由此可以看到两个stream类型分别是PART REDISTRIBUTE PART ROUNDROBIN和PART REDISTRIBUTE PART BROADCAST,这里标记stream类型为HYBRID类型。
- agg优化
对于agg操作,解决倾斜的思路与join操作不同,这里是通过首先在本DN内按照group by key进行去重操作,然后再进行重分布。因为经过DN内部去重之后,从全局来看,每个值的数量都不会超过DN数,因此不会出现严重的数据倾斜问题。以如下query为例:
1
select c1, c2, c3, c4, c5, c6, c7, c8, c9, count(*) from t group by c1, c2, c3, c4, c5, c6, c7, c8, c9 limit 10;
原执行结果如下:
id | operation | A-time | A-rows ----+--------------------------------------------+------------------------+---------- 1 | -> Streaming (type: GATHER) | 130621.783 | 12 2 | -> GroupAggregate | [85499.711,130432.341] | 12 3 | -> Sort | [85499.509,103145.632] | 36679237 4 | -> Streaming(type: REDISTRIBUTE) | [25668.897,85499.050] | 36679237 5 | -> Seq Scan on public.t | [9835.069,10416.388] | 36679237 4 --Streaming(type: REDISTRIBUTE) datanode1 (rows=36678837) datanode2 (rows=100) datanode3 (rows=100) datanode4 (rows=200)
其中存在大量倾斜数据,导致数据按照group by key进行重分布之后,datanode1的数据量是其他节点的数十万倍。在倾斜优化之后,首先在本DN进行一次group by操作,达到数据去重的效果,然后再进行重分布,可以发现基本没有数据倾斜的问题出现。
id | operation | A-time ----+--------------------------------------------+----------------------- 1 | -> Streaming (type: GATHER) | 10961.337 2 | -> HashAggregate | [10953.014,10953.705] 3 | -> HashAggregate | [10952.957,10953.632] 4 | -> Streaming(type: REDISTRIBUTE) | [10952.859,10953.502] 5 | -> HashAggregate | [10084.280,10947.139] 6 | -> Seq Scan on public.t | [4757.031,5201.168] Predicate Information (identified by plan id) ----------------------------------------------- 3 --HashAggregate Skew Agg Optimized by Statistic 4 --Streaming(type: REDISTRIBUTE) datanode1 (rows=17) datanode2 (rows=8) datanode3 (rows=8) datanode4 (rows=14)
适用范围
- join算子
- 支持nest loop,merge join,hash join等join方式;
- 当倾斜数据处于join的left侧时,支持inner join,left join,semi join,anti join;当倾斜属于位于join的right侧时,支持inner join,right join,right semi join,right anti join。
- 通过统计信息得到的倾斜优化计划,优化器会根据代价判断该计划是否为最优计划。通过hint和规则会强制生成倾斜优化计划。
- agg算子
- array_agg、string_agg、subplan in agg qual这几种场景不支持优化;
- 通过统计信息识别到的倾斜优化计划会受到代价、plan_mode_seed参数、best_agg_plan参数影响,而hint、规则识别到的不会。
- join两侧都需要做重分布: