Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Optimizing Statement Pushdown

Updated on 2024-10-14 GMT+08:00

Statement Pushdown

Currently, the GaussDB optimizer can use three methods to develop statement execution policies in the distributed framework: generating a statement pushdown plan, a distributed execution plan, or a distributed execution plan for sending statements.

  • A statement pushdown plan pushes query statements from a CN down to DNs for execution and returns the execution results to the CN.
  • In a distributed execution plan, a CN compiles and optimizes query statements, generates a plan tree, and then sends the plan tree to DNs for execution. After the statements have been executed, execution results will be returned to the CN.
  • A distributed execution plan for sending statements pushes queries that can be pushed down (mostly base table scanning statements) to DNs for execution. Then, the plan obtains the intermediate results and sends them to the CN, on which the remaining queries are to be executed.

The third policy sends many intermediate results from DNs to the CN for further execution. In this case, the CN performance bottleneck (in bandwidth, storage, and computing) is caused by statements that cannot be pushed down to DNs. Therefore, you are not advised to use the query statements where only the third policy applies.

Statements cannot be pushed down to DNs if they have functions that do not support pushdown or syntax that does not support pushdown. Generally, you can rewrite the execution statements to solve the problem.

Typical Scenarios of Statement Pushdown

In the GaussDB optimizer, if you want to support statement pushdown, set the GUC parameter enable_fast_query_shipping to on. Generally, no execution plan operator is displayed after the EXPLAIN statement. If the keyword "Data Node Scan on" in the execution plan is displayed in the first line (excluding the plan format), the statement has been pushed down to DNs for execution. The following describes statement pushdown and its supported scope from three scenarios.

1. Pushdown of single-table query statements

In a distributed database, to query a single table, whether the current statement can be pushed down depends on whether the CN needs to participate in calculation instead of simply collecting data. If the CN needs to further calculate the DN result, the statement cannot be pushed down. Generally, statements with keywords such as agg, windows function, limit/offset, sort, distinct cannot be pushed down.

  • Pushdown: Simple queries can be pushed down without further calculation on the CN.
    openGauss=# explain select * from t where c1 > 1;
                                     QUERY PLAN                                 
    ----------------------------------------------------------------------------
     Data Node Scan on "__REMOTE_FQS_QUERY__"  (cost=0.00..0.00 rows=0 width=0)
       Node/s: All datanodes
    (2 rows)
  • Non-pushdown: A CN with the limit clause cannot simply send statements to DNs and collect data, which is inconsistent with the semantics of the limit clause.
    openGauss=# explain select * from t limit 1;
                                         QUERY PLAN                                      
    -------------------------------------------------------------------------------------
     Limit  (cost=0.00..0.00 rows=1 width=12)
       ->  Data Node Scan on "__REMOTE_LIMIT_QUERY__"  (cost=0.00..0.00 rows=1 width=12)
             Node/s: All datanodes
    (3 rows)
  • Non-pushdown: A CN with the aggregate function cannot simply push down statements. Instead, it needs to further aggregate the results collected from DNs.
    openGauss=# explain select sum(c1), count(*) from t;
                                         QUERY PLAN                                      
    -------------------------------------------------------------------------------------
     Aggregate  (cost=0.10..0.11 rows=1 width=20)
       ->  Data Node Scan on "__REMOTE_GROUP_QUERY__"  (cost=0.00..0.00 rows=20 width=4)
             Node/s: All datanodes
    (3 rows)

2. Pushdown of multi-table query statements

In the multi-table query scenario, whether a statement can be pushed down depends on the join condition and distribution columns. That is, if the join condition matches the distribution columns of the table, the statement can be pushed down. Otherwise, the statement cannot be pushed down. Generally, a replication table can be pushed down.
  • Create two hash distribution tables.
    openGauss=# create table t(c1 int, c2 int, c3 int)distribute by hash(c1);
    CREATE TABLE
    openGauss=# create table t1(c1 int, c2 int, c3 int)distribute by hash(c1);
    CREATE TABLE
  • Pushdown: The join condition meets the hash distribution column attributes of two tables.
    openGauss=# explain select * from t1 join t on t.c1 = t1.c1;
                                     QUERY PLAN                                 
    ----------------------------------------------------------------------------
     Data Node Scan on "__REMOTE_FQS_QUERY__"  (cost=0.00..0.00 rows=0 width=0)
       Node/s: All datanodes
    (2 rows)
  • Non-pushdown: The join condition does not meet the hash distribution column attribute. That is, t1.c2 is not the distribution column of t1.
    openGauss=# explain select * from t1 join t on t.c1 = t1.c2;
                                             QUERY PLAN                                         
    --------------------------------------------------------------------------------------------
     Hash Join  (cost=0.25..0.53 rows=20 width=24)
       Hash Cond: (t1.c2 = t.c1)
       ->  Data Node Scan on t1 "_REMOTE_TABLE_QUERY_"  (cost=0.00..0.00 rows=20 width=12)
             Node/s: All datanodes
       ->  Hash  (cost=0.00..0.00 rows=20 width=12)
             ->  Data Node Scan on t "_REMOTE_TABLE_QUERY_"  (cost=0.00..0.00 rows=20 width=12)
                   Node/s: All datanodes
    (7 rows)

3. Special scenarios

In some special scenarios, for example, a statement containing the with recursive clause or a column-store table cannot be pushed down.

Checking Whether the Execution Plan Has Been Pushed Down to DNs

Perform the following procedure to quickly determine whether the execution plan can be pushed down to DNs:

  1. Set the GUC parameter enable_fast_query_shipping to off to use the distributed framework policy for the query optimizer.

    1
    SET enable_fast_query_shipping = off;
    

  2. View the execution plan.

    If the execution plan contains Data Node Scan nodes, the execution plan is a distributed execution plan for sending statements and cannot be pushed down. If the execution plan contains Streaming nodes, the SQL statements can be pushed down to DNs.

    For example:

    1
    2
    3
    4
    5
    select
    count(ss.ss_sold_date_sk order by ss.ss_sold_date_sk)c1 
    from store_sales ss, store_returns sr 
    where 
    sr.sr_customer_sk = ss.ss_customer_sk;
    

    The execution plan is as follows, which indicates that the SQL statement cannot be pushed down.

                                  QUERY PLAN
    --------------------------------------------------------------------------
    Aggregate
    ->  Hash Join
    Hash Cond: (ss.ss_customer_sk = sr.sr_customer_sk)
    ->  Data Node Scan on store_sales "_REMOTE_TABLE_QUERY_"
    Node/s: All datanodes
    ->  Hash
    ->  Data Node Scan on store_returns "_REMOTE_TABLE_QUERY_"
    Node/s: All datanodes
    (8 rows)

Syntax That Does Not Support Pushdown

SQL syntax that does not support pushdown is described using the following table definition examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
openGauss=# CREATE TABLE CUSTOMER1
(
    C_CUSTKEY     BIGINT NOT NULL
  , C_NAME        VARCHAR(25) NOT NULL
  , C_ADDRESS     VARCHAR(40) NOT NULL
  , C_NATIONKEY   INT NOT NULL
  , C_PHONE       CHAR(15) NOT NULL
  , C_ACCTBAL     DECIMAL(15,2)   NOT NULL
  , C_MKTSEGMENT  CHAR(10) NOT NULL
  , C_COMMENT     VARCHAR(117) NOT NULL
)
DISTRIBUTE BY hash(C_CUSTKEY);
openGauss=# CREATE TABLE test_stream(a int, b float);--float does not support redistribution.
openGauss=# CREATE TABLE sal_emp ( c1 integer[] ) DISTRIBUTE BY replication;
  • The returning statement cannot be pushed down.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    openGauss=# explain update customer1 set C_NAME = 'a' returning c_name;
                                   QUERY PLAN                                           
    ------------------------------------------------------------------
     Update on customer1  (cost=0.00..0.00 rows=30 width=187)
       Node/s: All datanodes
       Node expr: c_custkey
       ->  Data Node Scan on customer1 "_REMOTE_TABLE_QUERY_"  (cost=0.00..0.00 rows=30 width=187)
             Node/s: All datanodes
    (5 rows)
    
  • If a SQL statement contains the aggregate functions using ORDER BY, this statement cannot be pushed down.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    openGauss=# explain verbose select count ( c_custkey order by c_custkey) from customer1;
                                   
                             QUERY PLAN                                        
    ------------------------------------------------------------------ Aggregate  (cost=2.50..2.51 rows=1 width=8)
       Output: count(customer1.c_custkey ORDER BY customer1.c_custkey)
       ->  Data Node Scan on customer1 "_REMOTE_TABLE_QUERY_"  (cost=0.00..0.00 rows=30 width=8)
             Output: customer1.c_custkey
             Node/s: All datanodes
             Remote query: SELECT c_custkey FROM ONLY public.customer1 WHERE true
    (6 rows)
    
  • If a SQL statement contains COUNT(DISTINCT expr) and columns in COUNT(DISTINCT expr) do not support redistribution, this statement cannot be pushed down.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    openGauss=# explain verbose select count(distinct b) from test_stream;
                                              QUERY PLAN                                           
    ------------------------------------------------------------------ Aggregate  (cost=2.50..2.51 rows=1 width=8)
       Output: count(DISTINCT test_stream.b)
       ->  Data Node Scan on test_stream "_REMOTE_TABLE_QUERY_"  (cost=0.00..0.00 rows=30 width=8)
             Output: test_stream.b
             Node/s: All datanodes
             Remote query: SELECT b FROM ONLY public.test_stream WHERE true
    (6 rows)
    
  • A statement containing DISTINCT ON cannot be pushed down.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    openGauss=# explain verbose select distinct on (c_custkey) c_custkey from customer1 order by c_custkey;
                                                QUERY PLAN                                             
    ------------------------------------------------------------------ Unique  (cost=49.83..54.83 rows=30 width=8)
       Output: customer1.c_custkey
       ->  Sort  (cost=49.83..52.33 rows=30 width=8)
             Output: customer1.c_custkey
             Sort Key: customer1.c_custkey
             ->  Data Node Scan on customer1 "_REMOTE_TABLE_QUERY_"  (cost=0.00..0.00 rows=30 width=8)
                   Output: customer1.c_custkey
                   Node/s: All datanodes
                   Remote query: SELECT c_custkey FROM ONLY public.customer1 WHERE true
    (9 rows)
    
  • A statement containing array expressions cannot be pushed down.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    openGauss=# explain verbose select array[c_custkey,1] from customer1 order by c_custkey;
    
                              QUERY PLAN                                                    
    ------------------------------------------------------------------ Sort  (cost=49.83..52.33 rows=30 width=8)
       Output: (ARRAY[customer1.c_custkey, 1::bigint]), customer1.c_custkey
       Sort Key: customer1.c_custkey
       ->  Data Node Scan on "__REMOTE_SORT_QUERY__"  (cost=0.00..0.00 rows=30 width=8)
             Output: (ARRAY[customer1.c_custkey, 1::bigint]), customer1.c_custkey
             Node/s: All datanodes
             Remote query: SELECT ARRAY[c_custkey, 1::bigint], c_custkey FROM ONLY public.customer1 WHERE true ORDER BY 2
    (7 rows)
    
  • The following table describes the scenarios where a statement containing WITH RECURSIVE cannot be pushed down in the current version, as well as the causes.

    No.

    Scenario

    Cause of Not Supporting Pushdown

    1

    The query contains foreign tables.

    LOG: SQL can't be shipped, reason: RecursiveUnion contains ForeignScan is not shippable (In this table, LOG describes the cause of not supporting pushdown.)

    In the current version, queries containing foreign tables do not support pushdown.

    2

    Multiple node groups

    LOG: SQL can't be shipped, reason: With-Recursive under multi-nodegroup scenario is not shippable

    In the current version, pushdown is supported only when all base tables are stored and computed in the same node group.

    3

    ALL is not used for UNION. In this case, the return result is deduplicated.

    LOG: SQL can't be shipped, reason: With-Recursive does not contain "ALL" to bind recursive & none-recursive branches

    For example:

    WITH recursive t_result AS (
    SELECT dm,sj_dm,name,1 as level
    FROM test_rec_part
    WHERE sj_dm > 10
    UNION
    SELECT t2.dm,t2.sj_dm,t2.name||' > '||t1.name,t1.level+1
    FROM t_result t1
    JOIN test_rec_part t2 ON t2.sj_dm = t1.dm
    )
    SELECT * FROM t_result t;

    4

    A base table contains the system catalog.

    LOG: SQL can't be shipped, reason: With-Recursive contains system table is not shippable

    For example:

    WITH RECURSIVE x(id) AS
    (
    select count(1) from pg_class where oid=1247
    UNION ALL
    SELECT id+1 FROM x WHERE id < 5
    ), y(id) AS
    (
    select count(1) from pg_class where oid=1247
    UNION ALL
    SELECT id+1 FROM x WHERE id < 10
    )
    SELECT y.*, x.* FROM y LEFT JOIN x USING (id) ORDER BY 1;

    5

    Only VALUES is used for scanning base tables. In this case, the statement can be executed on the CN, and DNs are unnecessary.

    LOG: SQL can't be shipped, reason: With-Recursive contains only values rte is not shippable

    For example:

    WITH RECURSIVE t(n) AS (
    VALUES (1)
    UNION ALL
    SELECT n+1 FROM t WHERE n < 100
    )
    SELECT sum(n) FROM t;

    6

    Only the recursion part has correlation conditions of correlated subqueries, and the non-recursion part has no correlation condition.

    LOG: SQL can't be shipped, reason: With-Recursive recursive term correlated only is not shippable

    For example:

    select  a.ID,a.Name,
    (
    with recursive cte as (
    select ID, PID, NAME from b where b.ID = 1
    union all
    select parent.ID,parent.PID,parent.NAME
    from cte as child join b as parent on child.pid=parent.id
    where child.ID = a.ID
    )
    select NAME from cte limit 1
    ) cName
    from
    (
    select id, name, count(*) as cnt
    from a group by id,name
    ) a order by 1,2;

    7

    The replicate plan is used for limit in the non-recursion part but the hash plan is used in the recursion part, resulting in conflicts.

    LOG: SQL can't be shipped, reason: With-Recursive contains conflict distribution in none-recursive(Replicate) recursive(Hash)

    For example:

    WITH recursive t_result AS (
    select * from(
    SELECT dm,sj_dm,name,1 as level
    FROM test_rec_part
    WHERE sj_dm < 10 order by dm limit 6 offset 2)
    UNION all
    SELECT t2.dm,t2.sj_dm,t2.name||' > '||t1.name,t1.level+1
    FROM t_result t1
    JOIN test_rec_part t2 ON t2.sj_dm = t1.dm
    )
    SELECT * FROM t_result t;

    8

    recursive of multiple-layers are nested. That is, a recursive is nested in the recursion part of another recursive.

    LOG: SQL can't be shipped, reason: Recursive CTE references recursive CTE "cte"

    For example:

    with recursive cte as
    (
    select * from rec_tb4 where id<4
    union all
    select h.id,h.parentID,h.name from
    (
    with recursive cte as
    (
    select * from rec_tb4 where id<4
    union all
    select h.id,h.parentID,h.name from rec_tb4 h inner join cte c on h.id=c.parentID
    )
    SELECT id ,parentID,name from cte order by parentID
    ) h
    inner join cte  c on h.id=c.parentID
    )
    SELECT id ,parentID,name from cte order by parentID,1,2,3;

Functions That Do Not Support Pushdown

The following describes the volatility of functions. In GaussDB, every function has a volatility classification, with the possibilities being:

  • IMMUTABLE

    Indicates that the function always returns the same result if the parameter values are the same.

  • STABLE

    Indicates that the function cannot modify the database, and that within a single table scan it will consistently return the same result for the same parameter value, but its result varies by SQL statements.

  • VOLATILE

    Indicates that the function value can change in a single table scan and no optimization is performed.

The volatility of a function can be obtained by querying for its provolatile column in pg_proc. The value i indicates immutable, s indicates stable, and v indicates volatile. The valid values of the proshippable column in pg_proc are t, f, and NULL. This column and the provolatile column together describe whether a function is pushed down.

  • If the provolatile of a function is i, the function can be pushed down regardless of the value of proshippable.
  • If the provolatile of a function is s or v, the function can be pushed only if the value of proshippable is t.
  • CTEs containing random, exec_hadoop_sql, or exec_on_extension are not pushed down, because pushdown may lead to incorrect results.

When creating a customized function, you can specify the values of provolatile and proshippable. For details, see CREATE FUNCTION.

In scenarios where a function does not support pushdown, perform one of the following as required:

  • If it is a system function, replace it with a functionally equivalent one.
  • If it is a customized function, check whether its provolatile and proshippable are correctly defined.

Example: Customized Functions

Define a customized function that generates fixed output for a certain input as the immutable type.

Take the sales information of TPC Benchmark DS (TPC-DS) as an example. If you want to write a function to calculate the discount data of a product, you can define the function as follows:

1
2
3
4
CREATE FUNCTION func_percent_2 (NUMERIC, NUMERIC) RETURNS NUMERIC
AS 'SELECT $1 / $2 WHERE $2 > 0.01'
LANGUAGE SQL
VOLATILE;

Run the following statement:

1
2
SELECT func_percent_2(ss_sales_price, ss_list_price)
FROM store_sales;

The execution plan is as follows:

func_percent_2 is not pushed down, and ss_sales_price and ss_list_price are executed on a CN. In this case, a large amount of resources on the CN is consumed, and the performance deteriorates as a result.

In this example, the function generates the same output when the same input is provided. Therefore, we can modify the function to the following one:

1
2
3
4
CREATE FUNCTION func_percent_1 (NUMERIC, NUMERIC) RETURNS NUMERIC
AS 'SELECT $1 / $2 WHERE $2 > 0.01'
LANGUAGE SQL
IMMUTABLE;

Run the following statement:

1
2
SELECT func_percent_1(ss_sales_price, ss_list_price)
FROM store_sales;

The execution plan is as follows:

func_percent_1 is pushed down to DNs for quicker execution. (In TPC-DS 1000X, where three CNs and 18 DNs are used, the query efficiency is improved by over 100 times).

Example 2: Pushing Down the Sorting Operation

For details, see Case: Pushing Down Sort Operations to DNs.

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback