更新时间:2024-05-20 GMT+08:00
分享

Flink SQL逻辑开发建议

在aggregate和join等操作前将数据过滤来减少计算的数据量

提前过滤可以减少在shuffle阶段前的数据量,减少网络IO,从而提升查询效率。

比如在表join前先过滤数据比在ON和WHERE时过滤可以有效较少join数据量。因为执行顺序从发生shuffle再filter变成了先发生filter再shuffle。

【示例】优化后将谓词条件A.userid>10提前到了子查询语句中,减少了shuffle的数据量:

  • 优化前SQL:
    select... from A
    join B
    on A.key = B.key
    where A.userid > 10
        and B.userid < 10
        and A.dt='20120417'
        and B.dt='20120417';
  • 优化后SQL:
    select ... from (
        select ... from A where dt='201200417' and userid > 10
    )a
    join (
        select ... from B where dt='201200417' and userid < 10
    )b
    on a.key = b.key;

慎用正则表达式函数REGEXP

正则表达式是非常耗时的操作,对比加减乘除通常有百倍的性能开销,而且正则表达式在某些极端情况下可能会进入无限循环,导致作业阻塞。推荐首先使用LIKE。正则函数包括:

  • REGEXP
  • REGEXP_EXTRACT
  • REGEXP_REPLACE

【示例】

  • 使用正则表达式:
    SELECT
     *
    FROM
     table
    WHERE username NOT REGEXP "test|ceshi|tester'
  • 使用like模糊查询:
    SELECT
     *
    FROM
     table
    WHERE username NOT LIKE '%test%'
     AND username NOT LIKE '%ceshi%'
     AND username NOT LIKE '%tester%'

UDF嵌套不可过长

多个UDF嵌套时表达式长度很长,Flink优化生成的代码超过64KB导致编译错误。建议UDF嵌套不超过6个。

【示例】UDF嵌套:

SELECT 
    SUM(get_order_total(order_id))
FROM orders WHERE customer_id = (
    SELECT customer_id FROM customers WHERE customer_name = get_customer_name('John Doe')
)

聚合函数中case when语法改写成filter语法

在聚合函数中,FILTER是更符合SQL标准用于过滤的语法,并且能获得更多的性能提升。FILTER是用于聚合函数的修饰符,用于限制聚合中使用的值。

【示例】在某些场景下需要从不同维度来统计UV,如Android中的UV,iPhone中的UV,Web中的UV和总UV,这时可能会使用如下CASE WHEN语法。

  • 修改前:
    SELECT
    day,
    COUNT(DISTINCT user_id) AS total_uv,
    COUNT(DISTINCT CASE WHEN flag IN (android', "iphone'") THEN user_id ELSE NULL END) AS app_uv,
    COUNT(DISTINCT CASE WHEN flag IN(wap', 'other') THEN user_id ELSE NULL END) AS web_uv
    FROM T
    GROUP BY day
  • 修改后:
    SELECT
    day,
    COUNT(DISTINCT user_id) AS total_uv,
    COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
    COUNT(DISTINCT user_id) FILTER(WHERE flag IN ('wap', 'other'))AS web_uv
    FROM T
    GROUP BY day

Flink SQL优化器可以识别相同的distinct key上的不同过滤器参数。例如示例中三个COUNT DISTINCT都在user_id列上。Flink可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小,在某些工作负载下可以获得显著的性能提升。

拆分distinct聚合优化聚合中数据倾斜

通过两阶段聚合能消除常规的数据倾斜,但是处理distinct聚合时性能并不好。因为即使启动了两阶段聚合,distinct key也不能combine消除重复值,累加器中仍然包含所有的原始记录。

可以将不同的聚合(例如 COUNT(DISTINCT col))分为两个级别:

第一次聚合由group key和额外的bucket key进行shuffle。bucket key是使用HASH_CODE(distinct_key) % BUCKET_NUM计算的,BUCKET_NUM默认为1024,可以通过table.optimizer.distinct-agg.split.bucket-num选项进行配置。

第二次聚合是由原始group key进行shuffle,并使用SUM聚合来自不同buckets的COUNT DISTINCT值。由于相同的distinct key将仅在同一bucket中计算,因此转换是等效的。bucket key充当附加group key的角色,以分担group key中热点的负担。bucket key使Job具有可伸缩性来解决不同聚合中的数据倾斜/热点。

【示例】

  • 资源文件配置:
    table.optimizer.distinct-agg.split.enabled: true
    table.optimizer.distinct-agg.split.bucket-num: 1024
  • 查询今天有多少唯一用户登录:
    SELECT day, COUNT(DISTINCT user_id)
    FROM T
    GROUP BY day
  • 自动改写查询:
    SELECT day, SUM(cnt)
    FROM(
        SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
        )
    GROUP BY day
分享:

    相关文档

    相关产品