Updated on 2024-09-10 GMT+08:00

Development Suggestions

Filter Data Before Aggregate and Join to Reduce Data to Be Calculated

Filtering data before the shuffle phase to reduce network I/Os and improve query efficiency.

For example, filtering data before joining a table is more effective than filtering data when ON or WHERE is executed. The execution sequence is changed to filtering before shuffling.

[Example] Move predicate condition A.userid>10 before the subquery statement to reduce the shuffle data volume.

  • SQL statements before optimization:
    select... from A
    join B
    on A.key = B.key
    where A.userid > 10
        and B.userid < 10
        and A.dt='20120417'
        and B.dt='20120417';
  • SQL statement after optimization:
    select ... from (
        select ... from A where dt='201200417' and userid > 10
    )a
    join (
        select ... from B where dt='201200417' and userid < 10
    )b
    on a.key = b.key;

Exercise Caution When Using the Regular Expression Function REGEXP

Regular expressions are time-consuming and require x100 performance overhead of addition, subtraction, multiplication, and division. In addition, regular expressions may enter an infinite loop in some extreme cases, causing job blocking. You are advised to use LIKE to replace regular expressions. Typical regular expression functions you may use include:

  • REGEXP
  • REGEXP_EXTRACT
  • REGEXP_REPLACE

[Example]

  • The following statement uses a regular expression:
    SELECT
     *
    FROM
     table
    WHERE username NOT REGEXP "test|ceshi|tester'
  • The following statement uses Like for fuzzy query:
    SELECT
     *
    FROM
     table
    WHERE username NOT LIKE '%test%'
     AND username NOT LIKE '%ceshi%'
     AND username NOT LIKE '%tester%'

Do Not Use Long Expressions When You Nest UDFs

If the expression used to nest UDFs is too long, the code generated after Flink optimization exceeds 64 KB and a compilation error occurs. It is recommended that a maximum of six UDFs be nested.

[Example] Nest UDFs.

SELECT 
    SUM(get_order_total(order_id))
FROM orders WHERE customer_id = (
    SELECT customer_id FROM customers WHERE customer_name = get_customer_name('John Doe')
)

Replace CASE WHEN in Aggregate Functions with FILTER

In aggregate functions, FILTER is a SQL standard clause for data filtering that improves performance. FILTER is a modifier used in aggregate functions to limit the values used in aggregation.

[Example] In the following example, CASE WHEN is used to collect UV statistics from different dimensions, for example, Android UV, iPhone UV, web UV, and total UV.

  • Before modification
    SELECT
    day,
    COUNT(DISTINCT user_id) AS total_uv,
    COUNT(DISTINCT CASE WHEN flag IN (android', "iphone'") THEN user_id ELSE NULL END) AS app_uv,
    COUNT(DISTINCT CASE WHEN flag IN(wap', 'other') THEN user_id ELSE NULL END) AS web_uv
    FROM T
    GROUP BY day
  • After modification
    SELECT
    day,
    COUNT(DISTINCT user_id) AS total_uv,
    COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
    COUNT(DISTINCT user_id) FILTER(WHERE flag IN ('wap', 'other'))AS web_uv
    FROM T
    GROUP BY day

The Flink SQL optimizer can identify different filter parameters on the same distinct key. In the example, three COUNT DISTINCT are used on the user_id column. Flink can use only one shared state instance instead of three to reduce state access and state size. For some workloads, Flink can have significant improvement in performance.

Split Distinct Aggregation to Eliminate Data Skew

Two-phase aggregation can eliminate typical data skew, but the performance of processing distinct aggregation is poor. Even if two-phase aggregation is enabled, distinct keys cannot be combined to eliminate duplicate values, and the accumulator still contains all original records.

Different aggregations (COUNT(DISTINCT col)) can be divided into two levels:

For the first aggregation, use group key and additional bucket key to shuffle. The bucket key is calculated using HASH_CODE(distinct_key) % BUCKET_NUM. The default value of BUCKET_NUM is 1024, which can be configured using the table.optimizer.distinct-agg.split.bucket-num option.

For the second aggregation, use the original group key to shuffle and SUM to aggregate the Count DISTINCT values from different buckets. The same distinct key is calculated in the same bucket only. The conversion is equivalent. The bucket key functions as an additional group key to share the load of hotspots in the group key. The bucket key makes jobs to be scalable to avoid data skew and hotspotting in aggregations.

[Example]

  • Add the following configurations in the resource file:
    table.optimizer.distinct-agg.split.enabled: true
    table.optimizer.distinct-agg.split.bucket-num: 1024
  • Query the number of unique users who have logged in today:
    SELECT day, COUNT(DISTINCT user_id)
    FROM T
    GROUP BY day
  • The query is rewritten as follows:
    SELECT day, SUM(cnt)
    FROM(
        SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
        )
    GROUP BY day

Set the Join Field as the Primary Key When Joining Streams

When the join field is not the primary key, Flink uses a hash-based shuffle, which means that the original sequence of data is not preserved. This leads to multiple replicas of the same join key field stored in the state backend, causing a Cartesian product to be generated during the join.

For example, the field in table A is id, field1, and the field in table B is id, field2. Join tables A and B based on id. Table A has historical data (1, a1), and table B has historical data (1, b1). When table A changes from (1, a1) to (1, a2) and table B changes (1, b1) to (1, b2), the join result is as follows and the join result cannot preserve the data sequence:

1, a1, b1
1, a2, b1
1, a1, b2
1, a2, b2
  • SQL statement before optimization:
    create table t1 (
      id int,
      field1 string
    ) with(
      ......
    );
    create table t1 (
      id int,
      field2 string
    ) with(
      ......
    );
    select t1.id, t1.field1, t2.field2
    from t1 
    left join t2 on t1.id = t2.id;
  • SQL statement after optimization:
    create table t1 (
      id int,
      field1 string,
      primary key (id) not enforced
    ) with(
      ......
    );
    create table t1 (
      id int,
      field2 string,
      primary key (id) not enforced
    ) with(
      ......
    );
    select t1.id, t1.field1, t2.field2
    from t1 
    left join t2 on t1.id = t2.id;

Specify All Fields That Have the Composite Primary Key in the Select Clause When the Join Key Is a Composite Primary Key

If all fields that have the composite primary key are not used in Select, the join operator discards some primary keys. As a result, the join spec is NoUniqueKey.

  • SQL statements before optimization:
    create table table1(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp,
      primary key (uuid) not enforced
    ) with (
      'connector' = 'datagen',
      'rows-per-second' = '1'
    );
    create table table2(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp,
      primary key (uuid, name) not enforced
    ) with (
      'connector' = 'datagen',
      'rows-per-second' = '1'
    );
    create table print(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp
    ) with ('connector' = 'print');
    insert into
      print
    select
      t1.uuid,
      t1.name,
      t2.age,
      t2.ts
    from
      table1 t1
      join table2 t2 on t1.uuid = t2.uuid;
    Figure 1 NoUniqueKey join spec
  • SQL statements after optimization:
    create table table1(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp,
      primary key (uuid) not enforced
    ) with (
      'connector' = 'datagen',
      'rows-per-second' = '1'
    );
    create table table2(
      uuid varchar(20),
      name varchar(10),
      age int,
      ts timestamp,
      primary key (uuid, name) not enforced
    ) with (
      'connector' = 'datagen',
      'rows-per-second' = '1'
    );
    create table print(
      uuid varchar(20),
      name varchar(10),
      name1 varchar(10),
      age int,
      ts timestamp
    ) with ('connector' = 'print');
    insert into
      print
    select
      t1.uuid,
      t1.name,
      t2.name as name1,
      t2.age,
      t2.ts
    from
      table1 t1
      join table2 t2 on t1.uuid = t2.uuid;
    Figure 2 Optimized SQL

Use the Snowflake Schema When the Join Key Changes in a Left Join of Tables

Data disorder occurs when the left join key changes. You are advised to associate the right table with a view and then associate the view with the left table.

The change of the join key group_id causes the disorder of "-D" and "+I." Although the parallelism degree are the same during the user_id-based hashing, the "+I" message arrives first, and the "-D" message arrives later. As a result, the records are overwritten in the wide table.

  • SQL statement before optimization:
    select... 
    from t1
    left join t2 on t2.user_id = t1.user_id 
    left join t10 on t10.user_id = t1.user_id 
    left join t11 on t11.group_id = t10.group_id
    left join t12 on t12.user_id = t1.user_id 
  • SQL statement after optimization:
    create view tmp_view as(
    select
    ..
    from t10
    left join t11 on t11.group_id = t10.group_id
    );
    select... 
    from t1
    left join t2 on t2.user_id = t1.user_id 
    left join tmp_view on tmp_view.user_id = t1.user_id 
    left join t12 on t12.user_id = t1.user_id 

Use Lookup Join After All Dual-stream Joins for Table Left Join

Data disorder occurs when you run left join LATERAL TABLE in the downstream if a lookup join is performed before dual-stream joins.

Figure 3 Left join of tables

The specified primary key of the left stream cannot be inferred after a lookup join, so all left stream historical data is stored in the state. When right stream data arrives, each left stream record is matched with the latest state, withdrawn, and then associated with the corresponding source data in the LATERAL TABLE. This causes the data to become out of order.

To avoid incorrect data, perform a lookup join after a dual-stream join, as multiple consecutive "-D" messages can cause the last record to be incorrect.

Figure 4 Consecutive "-D" messages
  • SQL statement before optimization:
    select... 
    from t1
    left join t2 FOR SYSTEM_TIME AS OF t1.proctime AS t21 on t21.id = t1.id
    left join t3 on t3.id = t1.id
    left join LATERAL TABLE(udtf()) AS  t4(res1,res2.res3,res4) on true
  • SQL statement after optimization:
    select... 
    from t1
    left join t3 on t3.id = t1.id
    left join t2 FOR SYSTEM_TIME AS OF t1.proctime AS t21 on t21.id = t1.id
    left join LATERAL TABLE(udtf()) AS  t4(res1,res2.res3,res4) on true

Specify the Precision When Using the Char Type or Use the String Data Type

cast(id as char) truncates only the first digit during data type conversion, causing incorrect data. If the converted field is the primary key field, a large amount of data will be lost.

You are not advised to use table.exec.legacy-cast-behaviour=ENABLED to handle the conversion error.

In versions earlier than Flink 1.15, you can set table.exec.legacy-cast-behaviour to enabled to enable type conversion. However, in Flink 1.15 and later versions, this flag is disabled by default. In particular, this will:

  • Disable trimming/padding for casting to CHAR/VARCHAR/BINARY/VARBINARY
  • CAST never fails but returns NULL, behaving as TRY_CAST but without inferring the correct type
  • Formatting of some casting to CHAR/VARCHAR/STRING produces slightly different results.

We discourage the use of this flag and we strongly suggest for new projects to keep this flag disabled and use the new casting behavior. This flag will be removed in the next Flink versions.

  • SQL statement before optimization:
    select
    cast(id as char) as id,
    ... 
    from t1
  • SQL statement after optimization:
    select
    cast(id as string) as id,
    ... 
    from t1

Filter Out the Data To Be Withdrawn When Multiple Flink Jobs or INSERT INTO Statements Are Written into the Same Gauss for MySQL Database

When multiple Flink jobs write data to the same MySQL table, one job sends the withdrawal data (-D and -U) to delete the entire row and then inserts the updated data, causing other jobs to lose their changes.

  • SQL statement before optimization:
    create table source-A(
    id,
    user_id
    )with(
    'connector' = 'kafka'
    );
    create table source-B(
    id,
    org_id
    )with(
    'connector' = 'kafka'
    );
    create table sink-A(
    id,
    user_id
    )with(
    'connector' = 'jdbc'
    'url' = 'jdbc:mysql://****',
    'table-name' = 'sink-table'
    );
    create table sink-B(
    id,
    org_id
    )with(
    'connector' = 'jdbc'
    'url' = 'jdbc:mysql://****',
    'table-name' = 'sink-table'
    );
    insert into sink-A select id,user_id from source-A;
    insert into sink-B select id,org_id  from source-B;
  • SQL statement after optimization:
    create table source-A(
    id,
    user_id
    )with(
    'connector' = 'kafka'
    );
    create table source-B(
    id,
    org_id
    )with(
    'connector' = 'kafka'
    );
    create table sink-A(
    id,
    user_id
    )with(
    'connector' = 'jdbc'
    'url' = 'jdbc:mysql://****',
    'table-name' = 'sink-table',
    'filter.record.enabled' = 'true'
    );
    create table sink-B(
    id,
    org_id
    )with(
    'connector' = 'jdbc'
    'url' = 'jdbc:mysql://****',
    'table-name' = 'sink-table',
    'filter.record.enabled' = 'true'
    );
    insert into sink-A select id,user_id from source-A;
    insert into sink-B select id,org_id  from source-B;