更新时间:2024-09-27 GMT+08:00

Redis源表

功能描述

创建source流从Redis获取数据,作为作业的输入数据。

前提条件

  • 创建该作业前,需要建立DLI和Redis的增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 若需要获取key的值,则可以通过在Flink中设置主键获取,主键字段即对应Redis的key。
  • 若定义主键,则不能够定义复合主键,即主键只能是一个字段,不能是多个字段。
  • schema-syntax取值约束:
    • 当schema-syntax为map或array时,非主键字段最多只能有一个,且需要为相应的map或array类型。
    • 当schema-syntax为fields-scores时,非主键字段个数需要为偶数,且除主键字段外,每两个字段的第二个字段的数据类型需要为doule,该字段的值视为前一个字段的score。其示例如下:
      CREATE TABLE redisSource (
        redisKey string,
        order_id string,
        score1 double,
        order_channel string,
        score2 double,
        order_time string,
        score3 double,
        pay_amount double,
        score4 double,
        real_pay double,
        score5 double,
        pay_time string,
        score6 double,
        user_id string,
        score7 double,
        user_name string,
        score8 double,
        area_id string,
        score9 double,
        primary key (redisKey) not enforced
      ) WITH (
        'connector' = 'redis',
        'host' = 'RedisIP',
        'password' = 'RedisPassword',
        'data-type' = 'sorted-set',
        'deploy-mode' = 'master-replica',
        'schema-syntax' = 'fields-scores'
      );
  • data-type取值约束:
    • 当data-type为set时,Flink中定义的非主键字段的数据类型必须相同。
    • 当data-type为sorted-set并且schema-syntax为fields和array时,只能读取redis的sorted set中的值,而不能读取score。
    • 当data-type为string时,只能有一个非主键字段。
    • 当data-type为sorted-set,且schema-syntax为map时,除主键字段外,只能有一个非主键字段。

      该非主键字段需要为map类型,同时该字段map的value需要为double类型,表示score,该字段的map的key表示redis的set中的值。

    • 当data-type为sorted-set,且schema-syntax为array-scores时,除主键字段外,只能有两个非主键字段,且这两个字段的类型需要为array。
      两个字段其中第一个字段类型是array,表示Redis的set中的值;第二个字段类型为array<double>,表示相应索引的score。其示例如下:
      CREATE TABLE redisSink (
        order_id string,
        arrayField Array<String>,
        arrayScore array<double>,
        primary key (order_id) not enforced
      ) WITH (
        'connector' = 'redis',
        'host' = 'RedisIP',
        'password' = 'RedisPassword',
        'data-type' = 'sorted-set',
        "default-score" = '3',
        'deploy-mode' = 'master-replica',
        'schema-syntax' = 'array-scores'
      );

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (',' watermark for rowtime_column_name as watermark-strategy_expression)
  ,PRIMARY KEY (attr_name, ...) NOT ENFORCED
)
with (
  'connector' = 'redis',
  'host' = ''
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

说明

connector

String

connector类型,需配置为'redis'。

host

String

redis连接地址。

port

6379

Integer

redis连接端口。

password

String

redis认证密码。

namespace

String

redis key的namespace

delimiter

:

String

redis的key和namespace之间的分隔符。

data-type

hash

String

redis的数据类型,有下列选项:

  • hash
  • list
  • set
  • sorted-set
  • string

data-type取值约束详见data-type取值约束说明。

schema-syntax

fields

String

redis的schema语义,包含以下值(其具体使用请参考注意事项常见问题):

  • fields:适用于所有数据类型
  • fields-scores:适用于sorted set数据类型
  • array:适用于list、set、sorted set数据类型
  • array-scores:适用于sorted set数据类型
  • map:适用于hash、sorted set数据类型

schema-syntax取值约束详见schema-syntax取值约束说明。

deploy-mode

standalone

String

redis集群的部署模式,支持standalone、master-replica、cluster。默认为standalone。

retry-count

5

Integer

连接redis集群的尝试次数。

connection-timeout-millis

10000

Integer

尝试连接redis集群时的最大超时时间。

commands-timeout-millis

2000

Integer

等待操作完成响应的最大时间。

rebalancing-timeout-millis

15000

Integer

redis集群失败时的休眠时间。

scan-keys-count

1000

Integer

每次扫描时读取的数量。

default-score

0

Double

当data-type设置为“sorted-set”时的默认score。

deserialize-error-policy

fail-job

Enum

数据解析失败时的处理方式。枚举类型,包含以下值:

  • fail-job:作业失败
  • skip-row:跳过当前数据
  • null-field:设置当前数据为null

skip-null-values

true

Boolean

是否跳过null。

pwd_auth_name

String

DLI侧创建的Password类型的跨源认证名称。

使用跨源认证则无需在作业中配置账号和密码。

示例

该示例是从DCS Redis数据源中读取数据,并写入Print到结果表中,其具体步骤如下:

  1. 参考增强型跨源连接,根据redis所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
  2. 设置Redis的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据redis的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 在Redis客户端中执行如下命令,向不同的key中插入数据,以hash形式存储:
    HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106
    
    HMSET redisSource1 order_id 202103241606060001 order_channel appShop order_time "2021-03-24 16:06:06" pay_amount 200.00 real_pay 180.00 pay_time "2021-03-24 16:10:06" user_id 0001 user_name Alice area_id 330106
    
    HMSET redisSource2 order_id 202103251202020001 order_channel miniAppShop order_time "2021-03-25 12:02:02" pay_amount 60.00 real_pay 60.00 pay_time "2021-03-25 12:03:00" user_id 0002 user_name Bob area_id 330110
  4. 创建flink opensource sql作业,输入以下作业脚本读取Redis中hash格式的数据。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE redisSource (
      redisKey string,
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string,
      primary key (redisKey) not enforced  --获取redis中key的值
    ) WITH (
      'connector' = 'redis',
      'host' = 'RedisIP',
      'password' = 'RedisPassword',
      'data-type' = 'hash',
      'deploy-mode' = 'master-replica'
    );
    
    CREATE TABLE printSink (
      redisKey string,
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'print'
    );
    
    insert into printSink select * from redisSource;
  5. 按照如下方式查看taskmanager.out文件中的数据结果:
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

    数据结果参考如下:

    +I(redisSource1,202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106)
    +I(redisSource,202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106)
    +I(redisSource2,202103251202020001,miniAppShop,2021-03-25 12:02:02,60.0,60.0,2021-03-25 12:03:00,0002,Bob,330110)

常见问题

  • Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: RealLine:36;Usage of 'set' data-type and 'fields' schema syntax in source Redis connector with multiple non-key column types. As 'set' in Redis is not sorted, it's not possible to map 'set's values to table schema with different types.

    A:data-type为set类型时,flink中非主键字段的数据类型不相同,导致如上报错。data-type为set类型时,Flink中定义的非主键字段的数据类型必须相同。

  • Q:当使用data-type为hash时,那么schema-syntax为fields和map有什么区别?

    A:当schema-syntax为fields时,会将Redis的key中hash值赋给flink中同名相应字段;当schema-syntax为map时,会将Redis的每个hash中的hashkey和hashvalue放入一个map中,该map即为flink中相应字段的值,即这个map中包含Redis中某个key的所有hashkey和hashvalue。

    • 对于fields而言:
      1. 向Redis中插入如下数据
        HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106
      2. 当使用schema-syntax为fields时,作业脚本参考如下:
        CREATE TABLE redisSource (
          redisKey string,
          order_id string,
          order_channel string,
          order_time string,
          pay_amount double,
          real_pay double,
          pay_time string,
          user_id string,
          user_name string,
          area_id string,
          primary key (redisKey) not enforced
        ) WITH (
          'connector' = 'redis',
          'host' = 'RedisIP',
          'password' = 'RedisPassword',
          'data-type' = 'hash',
          'deploy-mode' = 'master-replica'
        );
        
        CREATE TABLE printSink (
          redisKey string,
          order_id string,
          order_channel string,
          order_time string,
          pay_amount double,
          real_pay double,
          pay_time string,
          user_id string,
          user_name string,
          area_id string
        ) WITH (
          'connector' = 'print'
        );
        
        insert into printSink select * from redisSource;
      3. 作业运行结果如下:
        +I(redisSource,202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106)
    • 对于map而言:
      1. 向Redis中插入如下数据:
        HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106
      2. 当使用schema-syntax为map时,其作业脚本参考如下:
        CREATE TABLE redisSource (
          redisKey string,
          order_result map<string, string>,
          primary key (redisKey) not enforced
        ) WITH (
          'connector' = 'redis',
          'host' = 'RedisIP',
          'password' = 'RedisPassword',
          'data-type' = 'hash',
          'deploy-mode' = 'master-replica',
          'schema-syntax' = 'map'
        );
        
        CREATE TABLE printSink (
          redisKey string,
          order_result map<string, string>
        ) WITH (
          'connector' = 'print'
        );
        
        insert into printSink select * from redisSource;
      3. 作业运行结果如下:
        +I(redisSource,{user_id=0001, user_name=Alice, pay_amount=100.00, real_pay=100.00, order_time=2021-03-24 10:00:00, area_id=330106, order_id=202103241000000001, order_channel=webShop, pay_time=2021-03-24 10:02:03})