更新时间:2022-12-07 GMT+08:00
流表JOIN
流与表进行连接操作,从表中查询并补全流字段。目前支持连接RDS表和DCS服务的Redis表。通过ON条件描述查询的Key,并补全表结构的Value字段。
RDS表的数据定义语句请参见创建RDS表。
Redis表的数据定义语句请参见创建Redis表。
语法格式
1 2 |
FROM tableExpression JOIN tableExpression ON value11 = value21 [ AND value12 = value22] |
语法说明
ON条件中只支持表属性等值查询,当存在二级Key时(Redis值类型为HASH情况下),需要AND表达Key和Hash Key等值查询。
注意事项
无。
示例
将车辆信息输入流与车辆价格表做等值连接后,获取车辆价格信息并填入车辆信息输出流后输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_detail_type STRING ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter = "," ); /** 创建数据维表,用于和输入流连接,实现字段回填 * * 根据实际情况修改以下选项: * value_type:redis的键值对应值类型,支持STRING、HASH、SET、ZSET、LIST,其中HASH类型需要指定hash_key_column作为二层主键,集合类型将用逗号拼接所有查询出来的值 * key_column:维表主键对应的列名 * hash_key_column:当redis的键值对应值类型为HASH时,HASHMAP的KEY对应的列名,当值类型非HASH时,无需指定改配置 * cluster_address:DCS服务redis集群地址 * password:DCS服务redis集群密码 **/ CREATE TABLE car_price_table ( car_brand STRING, car_detail_type STRING, car_price STRING ) WITH ( type = "dcs_redis", value_type = "hash", key_column = "car_brand", hash_key_column = "car_detail_type", cluster_address = "192.168.1.238:6379", password = "xxxxxxxx" ); CREATE SINK STREAM audi_car_owner_info ( car_id STRING, car_owner STRING, car_brand STRING, car_detail_type STRING, car_price STRING ) WITH ( type = "dis", region = "", channel = "dlioutput", partition_key = "car_owner", encode = "csv", field_delimiter = "," ); INSERT INTO audi_car_owner_info SELECT t1.car_id, t1.car_owner, t2.car_brand, t1.car_detail_type, t2.car_price FROM car_infos as t1 join car_price_table as t2 ON t2.car_brand = t1.car_brand and t2.car_detail_type = t1.car_detail_type WHERE t1.car_brand = "audi"; |
父主题: Flink SQL语法参考