更新时间:2026-06-30 GMT+08:00
分享

使用SQL hints配置不同状态的TTL

操作场景

在Flink Table API和SQL作业中,常规连接(Regular Join)和分组聚合(Group Aggregation)会产生状态数据,而默认的全局TTL设置可能无法满足不同查询中不同状态的保留需求。例如,某些连接的左表数据变化快需要短TTL,而右表数据变化慢需要长TTL。为此,Flink支持通过STATE_TTL提示,以更灵活的方式直接在查询中为常规连接和分组聚合指定自定义TTL值。

约束与限制

本章节适用于MRS 3.6.0-LTS及之后版本。

使用SQL hints配置不同状态的TTL

Table API和SQL用户可以使用STATE_TTL提示,以更灵活的方式直接在查询中为常规连接和分组聚合指定自定义TTL值。

  • 为join算子设置ttl SQL示例:
    CREATE TABLE source1 (
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'source1',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'csv'
    );
    CREATE TABLE source2 (
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'source2',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'csv'
    );
    CREATE TABLE print (
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` INT
    ) WITH ('connector' = 'print');
    INSERT INTO
      print
    SELECT
      /*+ STATE_TTL('s1'= '1d', 's2' = '20d') */
      s1.user_id,
      s1.user_name,
      s2.age
    FROM
      source1 s1
      left join source2 s2 ON s1.user_id = s2.user_id;
    • Kafka Broker实例业务IP地址:

      服务的实例IP地址可通过登录MRS集群Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。

      登录集群Manager具体操作,请参考访问MRS集群Manager

    • Kafka端口号
      • 集群的“认证模式”为“安全模式”时为“sasl.port”的值,默认为“21007”。
      • 集群的“认证模式”为“普通模式”时为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录MRS集群Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

  • 为aggregation算子设置ttl SQL示例:
    CREATE TABLE Orders (o_orderkey VARCHAR, o_totalprice INT) WITH (
      'connector' = 'kafka',
      'topic' = 'Orders',
      'properties.bootstrap.servers' = '192.168.67.13:21005',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'csv'
    );
    CREATE TABLE print (o_orderkey VARCHAR, sum_pri INT) WITH ('connector' = 'print');
    INSERT INTO
      print
    SELECT
      /*+ STATE_TTL('o' = '1d') */
      o_orderkey,
      SUM(o_totalprice) AS revenue
    FROM
      Orders AS o
    GROUP BY
      o_orderkey;

相关文档