更新时间:2022-12-07 GMT+08:00

流表JOIN

流与表进行连接操作,从表中查询并补全流字段。目前支持连接RDS表和DCS服务的Redis表。通过ON条件描述查询的Key,并补全表结构的Value字段。

RDS表的数据定义语句请参见创建RDS表

Redis表的数据定义语句请参见创建Redis表

语法格式

1
2
FROM tableExpression JOIN tableExpression
  ON value11 = value21 [ AND value12 = value22]

语法说明

ON条件中只支持表属性等值查询,当存在二级Key时(Redis值类型为HASH情况下),需要AND表达Key和Hash Key等值查询。

注意事项

无。

示例

将车辆信息输入流与车辆价格表做等值连接后,获取车辆价格信息并填入车辆信息输出流后输出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
CREATE SOURCE STREAM car_infos (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_detail_type STRING
)
WITH (
  type = "dis",
  region = "",
  channel = "dliinput",
  partition_count = "1",
  encode = "csv",
  field_delimiter = ","
);

/** 创建数据维表,用于和输入流连接,实现字段回填
  *
  * 根据实际情况修改以下选项:
  * value_type:redis的键值对应值类型,支持STRING、HASH、SET、ZSET、LIST,其中HASH类型需要指定hash_key_column作为二层主键,集合类型将用逗号拼接所有查询出来的值
  * key_column:维表主键对应的列名
  * hash_key_column:当redis的键值对应值类型为HASH时,HASHMAP的KEY对应的列名,当值类型非HASH时,无需指定改配置
  * cluster_address:DCS服务redis集群地址
  * password:DCS服务redis集群密码
  **/
CREATE TABLE car_price_table (
  car_brand STRING,
  car_detail_type STRING,
  car_price STRING
)
WITH (
  type = "dcs_redis",
  value_type = "hash",
  key_column = "car_brand",
  hash_key_column = "car_detail_type",
  cluster_address = "192.168.1.238:6379",
  password = "xxxxxxxx"
);

CREATE SINK STREAM audi_car_owner_info (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_detail_type STRING,
  car_price STRING
)
WITH (
  type = "dis",
  region = "",
  channel = "dlioutput",
  partition_key = "car_owner",
  encode = "csv",
  field_delimiter = ","
);

INSERT INTO audi_car_owner_info
SELECT t1.car_id, t1.car_owner, t2.car_brand, t1.car_detail_type, t2.car_price
FROM car_infos as t1 join car_price_table as t2
ON t2.car_brand = t1.car_brand and t2.car_detail_type = t1.car_detail_type
WHERE t1.car_brand = "audi";