Redis维表
功能描述
创建Redis表作为维表用于与输入流连接,从而生成相应的宽表。
前提条件
- 要建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
跨源认证简介及操作方法请参考跨源认证简介。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 若需要获取key的值,则可以通过在flink中设置主键获取,主键字段即对应redis的key。
- 若定义主键,则不能够定义复合主键,即主键只能是一个字段,不能是多个字段。
- schema-syntax取值约束:
- 当schema-syntax为map或array时,非主键字段最多只能只有一个,且需要为相应的map或array类型。
- 当schema-syntax为fields-scores时,非主键字段个数需要为偶数,且除主键字段外,每两个字段的第二个字段的类型需要为doule,会将该字段的值视为前一个字段的score,其示例如下:
CREATE TABLE redisSource ( redisKey string, order_id string, score1 double, order_channel string, score2 double, order_time string, score3 double, pay_amount double, score4 double, real_pay double, score5 double, pay_time string, score6 double, user_id string, score7 double, user_name string, score8 double, area_id string, score9 double, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields-scores' );
- data-type取值约束:
- 当data-type为set时,flink中定义的非主键字段的类型必须相同。
- 当data-type为sorted-set且schema-syntax为fields和array时,只能读取redis的sorted set中的值,而不能读取score。
- 当data-type为string时,只能有一个非主键字段。
- 当data-type为sorted-set,且schema-syntax为map时,除主键字段外,只能有一个非主键字段,且需要为map类型,同时该字段的map的value需要为double类型,表示score,该字段的map的key表示redis的set中的值。
- 当data-type为sorted-set,且schema-syntax为array-scores时,除主键字段外,只能有两个非主键字段,且这两个字段的类型需要为array。
两个字段其中第一个字段类型是array表示Redis的set中的值,第二个字段类型为array<double>,表示相应索引的score。其示例如下:
CREATE TABLE redisSink ( order_id string, arrayField Array<String>, arrayScore array<double>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', "default-score" = '3', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'array-scores' );
语法格式
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' );
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
connector类型,需配置为'redis'。 |
host |
是 |
无 |
String |
redis连接地址。 |
port |
否 |
6379 |
Integer |
redis连接端口。 |
password |
否 |
无 |
String |
redis认证密码。 |
namespace |
否 |
无 |
String |
redis key的namespace |
delimiter |
否 |
: |
String |
redis的key和namespace之间的分隔符。 |
data-type |
否 |
hash |
String |
redis的数据类型,有下列选项
data-type取值约束详见data-type取值约束说明。 |
schema-syntax |
否 |
fields |
String |
redis的schema语义,包含以下值:
schema-syntax取值约束详见schema-syntax取值约束说明。 |
deploy-mode |
否 |
standalone |
String |
redis集群的部署模式,支持standalone、master-replica、cluster,默认standalone。 |
retry-count |
是 |
5 |
Integer |
设置每个连接请求的队列大小。如果超过队列大小,则命令调用将导致RedisException。将requestQueueSize设置为较低的值将导致在过载期间或连接处于断开状态时更早出现异常。更高的值意味着达到边界需要更长的时间,但可能会有更多的请求排队,并使用更多的堆空间。默认请设置为2147483647。 |
connection-timeout-millis |
否 |
10000 |
Integer |
尝试连接redis集群时的最大超时时间。 |
commands-timeout-millis |
否 |
2000 |
Integer |
等待操作完成响应的最大时间。 |
rebalancing-timeout-millis |
否 |
15000 |
Integer |
redis集群失败时的休眠时间。 |
scan-keys-count |
否 |
1000 |
Integer |
每次扫描时读取的数量。 |
default-score |
否 |
0 |
Double |
当data-type设置为“sorted-set”数据类型的默认score。 |
deserialize-error-policy |
否 |
fail-job |
Enum |
数据解析失败时的处理方式。 枚举类型,包含以下值:
|
skip-null-values |
否 |
true |
Boolean |
是否跳过null。 |
lookup.async |
否 |
false |
Boolean |
作为redis维表时,是否使用异步 I/O。 |
pwd_auth_name |
否 |
无 |
String |
DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置和账号密码。 |
示例
从Kafka源表中读取数据,将Redis表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下:
- 参考增强型跨源连接,根据Redis和Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Redis和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Redis的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 登录Redis客户端,通过如下命令向Redis发送如下数据:
HMSET 330102 area_province_name a1 area_province_name b1 area_county_name c1 area_street_name d1 region_name e1 HMSET 330106 area_province_name a1 area_province_name b1 area_county_name c2 area_street_name d2 region_name e1 HMSET 330108 area_province_name a1 area_province_name b1 area_county_name c3 area_street_name d3 region_name e1 HMSET 330110 area_province_name a1 area_province_name b1 area_county_name c4 area_street_name d4 region_name e1
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。该作业脚本将Kafka为数据源,Redis作为维表,数据写入到Kafka结果表中。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string, primary key (area_id) not enforced -- redis的key ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'kafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
- 连接Kafka集群,向Kafka的source topic中插入如下测试数据:
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
- 连接Kafka集群,在Kafka的sink topic读取数据,结果数据参考如下:
{"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
常见问题
若在windows环境中向redis中写入中文时,会导致写入数据异常,请避免此情况。