更新时间:2024-05-07 GMT+08:00





DROP SCHEMA IF EXISTS rewrite_rule_guc_test CASCADE; 
CREATE SCHEMA rewrite_rule_guc_test;
SET current_schema=rewrite_rule_guc_test;
CREATE TABLE t(c1 INT, c2 INT, c3 INT, c4 INT);
CREATE TABLE t1(c1 INT, c2 INT, c3 INT, c4 INT);
CREATE TABLE t2(c1 INT, c2 INT, c3 INT, c4 INT);




gaussdb=# set rewrite_rule='none'; 
gaussdb=# explain (verbose on, costs off)  select group_concat(tt.c1, tt.c2) from (select t1.c1,t2.c2 from t1,t2 where t1.c1=t2.c2) tt(c1,c2);
                                 QUERY PLAN
   Output: group_concat(t1.c1, t2.c2 SEPARATOR ',')
   ->  Hash Join
         Output: t1.c1, t2.c2
         Hash Cond: (t1.c1 = t2.c2)
         ->  Data Node Scan on t1 "_REMOTE_TABLE_QUERY_"
               Output: t1.c1
               Node/s: All datanodes
               Remote query: SELECT c1 FROM ONLY public.t1 WHERE true
         ->  Hash
               Output: t2.c2
               ->  Data Node Scan on t2 "_REMOTE_TABLE_QUERY_"
                     Output: t2.c2
                     Node/s: All datanodes
                     Remote query: SELECT c2 FROM ONLY public.t2 WHERE true


  1. 首先下发select c1 from t1 where true语句到DN读取全部t1表的数据。
  2. 然后下发select c2 from t2 where true语句到DN读取全部t2表的数据。
  3. 获取需要的数据之后,在CN上做HASH JOIN。
  4. 最后结果参与group_concat运算并返回最终结果。

该计划很慢,原因是网络传输了大量数据,然后在CN上执行HASH JOIN,不能充分利用集群资源。


gaussdb=#  set rewrite_rule='partialpush'; 
gaussdb=#  explain (verbose on, costs off) select two_sum(tt.c1, tt.c2) from (select t1.c1,t2.c2 from t1,t2 where t1.c1=t2.c2) tt(c1,c2);
                       QUERY PLAN
 Subquery Scan on tt
   Output: two_sum(tt.c1, tt.c2)
   ->  Streaming (type: GATHER)  --Gather以下计划在DN分布式执行
         Output: t1.c1, t2.c2
         Node/s: All datanodes
         ->  Nested Loop
               Output: t1.c1, t2.c2
               Join Filter: (t1.c1 = t2.c2)
               ->  Seq Scan on public.t1
                     Output: t1.c1, t1.c2, t1.c3
                     Distribute Key: t1.c1
               ->  Materialize
                     Output: t2.c2
                     ->  Streaming(type: REDISTRIBUTE)
                           Output: t2.c2
                           Distribute Key: t2.c2
                           Spawn on: All datanodes
                           Consumer Nodes: All datanodes
                           ->  Seq Scan on public.t2
                                 Output: t2.c2
                                 Distribute Key: t2.c1
(21 rows)



gaussdb=#  set rewrite_rule='none'; 
gaussdb=#  explain (verbose on, costs off) select c1,(select avg(c2) from t2 where t2.c2=t1.c2) from t1 where t1.c1<100 order by t1.c2;
                              QUERY PLAN
 Streaming (type: GATHER)
   Output: t1.c1, ((SubPlan 1)), t1.c2
   Merge Sort Key: t1.c2
   Node/s: All datanodes
   ->  Sort
         Output: t1.c1, ((SubPlan 1)), t1.c2
         Sort Key: t1.c2
         ->  Seq Scan on public.t1
               Output: t1.c1, (SubPlan 1), t1.c2
               Distribute Key: t1.c1
               Filter: (t1.c1 < 100)
               SubPlan 1
                 ->  Aggregate
                       Output: avg(t2.c2)
                       ->  Result
                             Output: t2.c2
                             Filter: (t2.c2 = t1.c2)
                             ->  Materialize
                                   Output: t2.c2
                                   ->  Streaming(type: BROADCAST)
                                         Output: t2.c2
                                         Spawn on: All datanodes
                                         Consumer Nodes: All datanodes
                                         ->  Seq Scan on public.t2
                                               Output: t2.c2
                                               Distribute Key: t2.c1
(26 rows)

由于目标列中的相关子查询(select avg(c2) from t2 where t2.c2=t1.c2)无法提升的缘故,导致每扫描t1的一行数据,就会触发子查询的一次执行,效率低下。如果打开intargetlist参数会把子查询提升转为JOIN,来提升查询的性能:

gaussdb=#  set rewrite_rule='intargetlist';
gaussdb=#  explain (verbose on, costs off) select c1,(select avg(c2) from t2 where t2.c2=t1.c2) from t1 where t1.c1<100 order by t1.c2;
                          QUERY PLAN
 Streaming (type: GATHER)
   Output: t1.c1, (avg(t2.c2)), t1.c2
   Merge Sort Key: t1.c2
   Node/s: All datanodes
   ->  Sort
         Output: t1.c1, (avg(t2.c2)), t1.c2
         Sort Key: t1.c2
         ->  Hash Right Join
               Output: t1.c1, (avg(t2.c2)), t1.c2
               Hash Cond: (t2.c2 = t1.c2)
               ->  Streaming(type: BROADCAST)
                     Output: (avg(t2.c2)), t2.c2
                     Spawn on: All datanodes
                     Consumer Nodes: All datanodes
                     ->  HashAggregate
                           Output: avg(t2.c2), t2.c2
                           Group By Key: t2.c2
                           ->  Streaming(type: REDISTRIBUTE)
                                 Output: t2.c2
                                 Distribute Key: t2.c2
                                 Spawn on: All datanodes
                                 Consumer Nodes: All datanodes
                                 ->  Seq Scan on public.t2
                                       Output: t2.c2
                                       Distribute Key: t2.c1
               ->  Hash
                     Output: t1.c1, t1.c2
                     ->  Seq Scan on public.t1
                           Output: t1.c1, t1.c2
                           Distribute Key: t1.c1
                           Filter: (t1.c1 < 100)
(31 rows)



select t1.c1 from t1 where t1.c1 = (select t2.c1 from t2 where t1.c1=t2.c2) ;


select t1.c1 from t1 join (select t2.c1 from t2 where t2.c1 is not null group by t2.c1(unique check)) tt(c1) on tt.c1=t1.c1;

需注意,上述SQL中的unique check表示t2.c1需要进行检查,非正常SQL表达,该SQL无法直接执行。为了保证语义等价,子查询tt必须保证对于每个group by t2.c1只能有一行输出。打开uniquecheck查询重写参数保证可以提升并且等价,如果在运行时输出了多于一行的数据,就会报错。

gaussdb=#  set rewrite_rule='uniquecheck';
gaussdb=#  explain verbose select t1.c1 from t1 where t1.c1 = (select t2.c1 from t2 where t1.c1=t2.c1) ;
                               QUERY PLAN
 Streaming (type: GATHER)
   Output: t1.c1
   Node/s: All datanodes
   ->  Nested Loop
         Output: t1.c1
         Join Filter: (t1.c1 = subquery."?column?")
         ->  Seq Scan on public.t1
               Output: t1.c1, t1.c2, t1.c3
               Distribute Key: t1.c1
         ->  Materialize
               Output: subquery."?column?", subquery.c1
               ->  Subquery Scan on subquery
                     Output: subquery."?column?", subquery.c1
                     ->  HashAggregate
                           Output: t2.c1, t2.c1
                           Group By Key: t2.c1
                           Filter: (t2.c1 IS NOT NULL)
                           Unique Check Required   --如果在运行时输出了多于一行的数据,就会报错。
                           ->  Index Only Scan using t2idx on public.t2
                                 Output: t2.c1
                                 Distribute Key: t2.c1
(21 rows)

注意:因为分组group by t2.c1 unique check发生在过滤条件tt.c1=t1.c1之前,可能导致原来不报错的查询重写之后报错。举例:


gaussdb=#  select * from t1 order by c2;
 c1 | c2 | c3
  1 |  1 |  1
  2 |  2 |  2
  3 |  3 |  3
  4 |  4 |  4
  5 |  5 |  5
  6 |  6 |  6
  7 |  7 |  7
  8 |  8 |  8
  9 |  9 |  9
 10 | 10 | 10
(10 rows)

gaussdb=#  select * from t2 order by c1;
 c1 | c2 | c3
  1 |  1 |  1
  2 |  2 |  2
  3 |  3 |  3
  4 |  4 |  4
  5 |  5 |  5
  6 |  6 |  6
  7 |  7 |  7
  8 |  8 |  8
  9 |  9 |  9
 10 | 10 | 10
 11 | 11 | 11
 11 | 11 | 11
 12 | 12 | 12
 12 | 12 | 12
 13 | 13 | 13
 13 | 13 | 13
 14 | 14 | 14
 14 | 14 | 14
 15 | 15 | 15
 15 | 15 | 15
 16 | 16 | 16
 16 | 16 | 16
 17 | 17 | 17
 17 | 17 | 17
 18 | 18 | 18
 18 | 18 | 18
 19 | 19 | 19
 19 | 19 | 19
 20 | 20 | 20
 20 | 20 | 20
(30 rows)


gaussdb=#   select t1.c1 from t1 where t1.c1 = (select t2.c1 from t2 where t1.c1=t2.c2) ;
(10 rows)

gaussdb=#  set rewrite_rule='uniquecheck';
gaussdb=#   select t1.c1 from t1 where t1.c1 = (select t2.c1 from t2 where t1.c1=t2.c2) ;
ERROR:  more than one row returned by a subquery used as an expression



  • predpushnormal:尝试下推谓词到子查询中,需要利用STREAM算子,如BROADCAST来实现分布式计划。
  • predpushforce:尝试下推谓词到子查询中,尽量利用参数化路径的索引扫描。
  • predpush:利用代价在predpushnormal和predpushforce中选择一个最优的分布式计划,但是会增加优化时间。


gaussdb=# set enable_fast_query_shipping=off; -- 关闭fqs优化
gaussdb=# show rewrite_rule;
(1 row)

gaussdb=# explain (costs off) select * from t1, (select sum(c2), c1 from t2 group by c1) st2 where st2.c1 = t1.c1;
              QUERY PLAN
 Streaming (type: GATHER)
   Node/s: All datanodes
   ->  Nested Loop
         Join Filter: (t1.c1 = t2.c1)
         ->  HashAggregate
               Group By Key: t2.c1
               ->  Seq Scan on t2
         ->  Seq Scan on t1
(8 rows)

gaussdb=# set rewrite_rule='predpushnormal';
gaussdb=# explain (costs off) select * from t1, (select sum(c2), c1 from t2 group by c1) st2 where st2.c1 = t1.c1;
                 QUERY PLAN
 Streaming (type: GATHER)
   Node/s: All datanodes
   ->  Nested Loop
         ->  Seq Scan on t1
         ->  HashAggregate
               Group By Key: t2.c1
               ->  Result
                     Filter: (t1.c1 = t2.c1)
                     ->  Seq Scan on t2
(9 rows)


gaussdb=# set rewrite_rule='predpushforce';

gaussdb=# explain (costs off) select * from t1, (select sum(c2), c1 from t2 group by c1) st2 where st2.c1 = t1.c1;
                     QUERY PLAN
 Streaming (type: GATHER)
   Node/s: All datanodes
   ->  Nested Loop
         ->  Seq Scan on t1
         ->  HashAggregate
               Group By Key: t2.c1
               ->  Index Scan using t2_c1_idx on t2
                     Index Cond: (t1.c1 = c1)
(8 rows)

--结合predpush hint一起使用,可以看到使用了参数化路径。

gaussdb=# set rewrite_rule = 'predpush';
gaussdb=# explain (costs off) select * from t1, (select sum(c2), c1 from t2 group by c1) st2 where st2.c1 = t1.c1;
                     QUERY PLAN
 Streaming (type: GATHER)
   Node/s: All datanodes
   ->  Nested Loop
         ->  Seq Scan on t1
         ->  HashAggregate
               Group By Key: t2.c1
               ->  Result
                     Filter: (t1.c1 = t2.c1)
                     ->  Seq Scan on t2
(9 rows)



gaussdb=# create table t_rep(a int) distribute by replication;
gaussdb=# create table t_dis(a int);
NOTICE:  The 'DISTRIBUTE BY' clause is not specified. Using 'a' as the distribution column by default.
HINT:  Please use 'DISTRIBUTE BY' clause to specify suitable data distribution column.
gaussdb=# set rewrite_rule = '';
gaussdb=# explain (costs off) select * from t_dis where a = any(select a from t_rep) or a > 100;
                          QUERY PLAN
 Streaming (type: GATHER)
   Node/s: All datanodes
   ->  Hash Left Join
         Hash Cond: (t_dis.a = subquery.a)
         Filter: ((subquery.a IS NOT NULL) OR (t_dis.a > 100))
         ->  Seq Scan on t_dis
         ->  Hash
               ->  Subquery Scan on subquery
                     Filter: (Hash By subquery.a)
                     ->  HashAggregate
                           Group By Key: t_rep.a
                           ->  Seq Scan on t_rep
(12 rows)


gaussdb=# set rewrite_rule = disablerep;
gaussdb=# explain (costs off) select * from t_dis where a = any(select a from t_rep) or a > 100;
                    QUERY PLAN
 Streaming (type: GATHER)
   Node/s: All datanodes
   ->  Seq Scan on t_dis
         Filter: ((hashed SubPlan 1) OR (a > 100))
         SubPlan 1
           ->  Seq Scan on t_rep
(6 rows