更新时间:2025-07-03 GMT+08:00
分享

FlinkSQL Redis表开发规则

Flink Redis作业参数规范

Flink Redis作业参数配置规范如下表所示。

表1 Flink Redis作业参数规范

配置项

是否必选

类型

描述

zSetScoreColumn

可选

String

Redis作为维表时,ZSet格式score字段对应的列名。

hashKeyColumn

可选

String

Hash格式,Hash字段对应的列名。

host

必选

String

Redis集群连接IP,为Redis集群的实例IP(业务平面)。

port

必选

String

端口为对应的Redis实例的端口。

Redis实例的端口计算方式为:22400+该实例的ID-1。

实例ID可以通过在FusionInsight Manager中选择“集群 > 服务 > Redis > Redis管理”,单击Redis集群名称查看。

例如Redis集群内角色R1对应的Redis实例的端口为22400+1-1=22400。

separator

可选

String

Redis作为维表时,value中的字段分隔符,示例:“(,)”、“(\u200b)”。

key-ttl-mode

可选

String

Redis数据过期策略:

  • no-ttl:数据不过期。
  • expire-msec: 指定多长时间之后数据过期,以毫秒为单位。
  • expire-at-date:到指定时间数据过期,精确到秒。
  • expire-at-timestamp:到指定时间数据过期,精确到毫秒。

key-ttl

可选

String

配置“key-ttl-mode”参数为非“no-ttl”时需设置该值,该值不需要带单位。

isSSLMode

可选

String

是否开启SSL模式:

  • true:开启SSL模式。
  • false:不开启SSL模式。

keyPrefix

可选

String

Redis key的前缀。

need-kerberos-auth

可选

String

是否开启Kerberos认证:

  • true:开启Kerberos认证(安全模式)。
  • false:不开启Kerberos认证(普通模式)。

service-kerberos-name

可选

String

Redis实例的Kerberos服务名(启用安全模式时需要设置)。

示例:'redis/hadoop.{系统域名}'。

系统域名:登录FusionInsight Manager界面,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

【示例】

CREATE table redis_sink(
  account varchar,
  costs int,
  PRIMARY KEY(account) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'deploy-mode'='cluster',
  'need-kerberos-auth' = 'true',
  'service-kerberos-name' = 'redis/hadoop.系统域名',
  'login-context-name' = 'Client',
  'host' = 'redis实例的ip',
  'port' = 'redis实例的端口',
  'isSSLMode' = 'true',
  'data-type' = 'string',
  'namespace' = 'redis_table_2',
  'sink.batch.max-size' = '-1',--是否开启批写Redis并设置批写数量,'-1'表示不开启,若开启则需同步开启CheckPoint
  'sink.flush-buffer.timeout' = '1000'--开启批写Redis后可按照指定时间将队列里面的数据刷新到Redis,单位:ms
);
  • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名
  • host、port:分别为Redis集群的其中一个实例IP(业务平面)和端口号。

    Redis实例的端口计算方式为:22400+该实例的ID-1。

    实例ID可以通过在FusionInsight Manager中选择“集群 > 服务 > Redis > Redis管理”,单击Redis集群名称查看。

    例如Redis集群内角色R1对应的Redis实例的端口为22400+1-1=22400。

  • namespace:用于拼接Redis数据库的键,格式为“namespace的值:account的值”。如发送数据的account的值为“A1”,namespace的值为“redis_table_2”,那么此数据在Redis数据库中的键为redis_table_2:A1。
  • sink.batch.max-size:
    • 开启批写Redis并设置批写数量(正整数),单位:条。“-1”表示不开启批写Redis。

      开启该功能可提升大数据场景下性能表现,但不适合对实时性要求过高的场景,建议批写数量不超过30000。

    • 设置该参数需同步开启CheckPoint。
  • sink.flush-buffer.timeout:开启批写Redis后,可按照指定时间将队列里面的数据刷新到Redis。单位:ms。

相关文档