Updated on 2023-11-03 GMT+08:00

JOIN Between Stream Data and Table Data

The JOIN operation allows you to query data from a table and write the query result to the sink stream. Currently, only RDSs and DCS Redis tables are supported. The ON keyword describes the Key used for data query and then writes the Value field to the sink stream.

For details about the data definition statements of RDS tables, see Creating an RDS Table.

For details about the data definition statements of Redis tables, see Creating a Redis Table.

Syntax

1
2
FROM tableExpression JOIN tableExpression
  ON value11 = value21 [ AND value12 = value22]

Syntax Description

The ON keyword only supports equivalent query of table attributes. If level-2 keys exist (specifically, the Redis value type is HASH), the AND keyword needs to be used to express the equivalent query between Key and Hash Key.

Precautions

None

Example

Perform equivalent JOIN between the vehicle information source stream and the vehicle price table, get the vehicle price data, and write the price data into the vehicle information sink stream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
CREATE SOURCE STREAM car_infos (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_detail_type STRING
)
WITH (
  type = "dis",
  region = "",
  channel = "dliinput",
  partition_count = "1",
  encode = "csv",
  field_delimiter = ","
);

/** Create a data dimension table to connect to the source stream to fulfill field backfill.
  *
  * Reconfigure the following options according to actual conditions:
  * value_type: indicates the value type of the Redis key value. The value can be STRING, HASH, SET, ZSET, or LIST. For the HASH type, you need to specify hash_key_column as the layer-2 primary key. For the SET type, you need to concatenate all queried values using commas (,).
  * key_column: indicates the column name corresponding to the primary key of the dimension table.
  * hash_key_column: indicates the column name corresponding to the KEY of the HASHMAP when value_type is HASH. If value_type is not HASH, you do not need to set this option.
  * cluster_address: indicates the DCS Redis cluster address.
  * password: indicates the DCS Redis cluster password.
  **/
CREATE TABLE car_price_table (
  car_brand STRING,
  car_detail_type STRING,
  car_price STRING
)
WITH (
  type = "dcs_redis",
  value_type = "hash",
  key_column = "car_brand",
  hash_key_column = "car_detail_type",
  cluster_address = "192.168.1.238:6379",
  password = "xxxxxxxx"
);

CREATE SINK STREAM audi_car_owner_info (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_detail_type STRING,
  car_price STRING
)
WITH (
  type = "dis",
  region = "",
  channel = "dlioutput",
  partition_key = "car_owner",
  encode = "csv",
  field_delimiter = ","
);

INSERT INTO audi_car_owner_info
SELECT t1.car_id, t1.car_owner, t2.car_brand, t1.car_detail_type, t2.car_price
FROM car_infos as t1 join car_price_table as t2
ON t2.car_brand = t1.car_brand and t2.car_detail_type = t1.car_detail_type
WHERE t1.car_brand = "audi";