创建RDS表
创建RDS/DWS表用于与输入流连接。
RDS的更多信息,请参见《关系型数据库用户指南》。
流表JOIN语法请参见流表JOIN。
前提条件
- 请务必确保您的账户下已在关系型数据库(RDS)里创建了PostgreSQL或MySQL类型的RDS实例。
如何创建RDS实例,请参见《关系型数据库快速入门》中“购买实例”章节。
- 该场景作业需要运行在DLI的独享队列上,因此要与RDS实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE table_id ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "", password = "", db_url = "", table_name = "" ); |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,rds表示输出到关系型数据库中。 |
username |
是 |
数据库连接用户名。 |
password |
是 |
数据库连接密码。 |
db_url |
是 |
数据库连接地址,格式为:"{database_type}://ip:port/database" 目前支持两种数据库连接:MySQL和PostgreSQL
|
table_name |
是 |
用于查询数据的数据库表名。 |
db_columns |
否 |
流属性和数据库表的字段对应关系。当sink流中流属性字段和数据库表中的流属性字段不完全匹配时,该参数必配。格式为“dbtable_attr1,dbtable_attr2,dbtable_attr3“。 |
cache_max_num |
否 |
表示最大缓存的查询结果数,默认值为32768。 |
cache_time |
否 |
表示数据库查询结果在内存中缓存的最大时间。单位为毫秒,默认值为10000,当值为0时表示不缓存。 |
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "", channel = "dliinput", encode = "csv", field_delimiter = "," ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "******", db_url = "postgresql://192.168.0.0:2000/test1", table_name = "car" ); CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "", channel = "dlioutput", partition_key = "car_owner", encode = "csv", field_delimiter = "," ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info as b on a.car_id = b.car_id; |
将数据库连接地址设置为DWS数据库地址,即可创建DWS维表。DWS数据库版本大于8.1.0后,无法用开源的postgresql驱动连接,需要用gaussdb驱动进行连接。