DWS输出流(通过JDBC方式)
功能描述
DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。
数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
前提条件
- 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。
- 请确保已创建DWS数据库表。
- 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
如何建立增强型跨源连接,请参见《数据湖探索用户指南》中增强型跨源连接章节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
语法格式
1 2 3 4 5 6 7 8 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "rds", username = "", password = "", db_url = "", table_name = "" ); |
关键字
参数 |
是否必选 |
说明 |
||
---|---|---|---|---|
type |
是 |
输出通道类型,rds表示输出到关系型数据库或者数据仓库服务中。 |
||
username |
是 |
数据库连接用户名。 |
||
password |
是 |
数据库连接密码。 |
||
db_url |
是 |
数据库连接地址格式为:postgresql://ip:port/database。 |
||
table_name |
是 |
要插入数据的数据库表名。数据库表需事先创建好。 |
||
db_columns |
否 |
支持配置输出流属性和数据库表属性的对应关系,需严格按照输出流的属性顺序配置。 示例:
student_name对应数据库里的name属性,student_age对应数据库里的age属性。
说明:
|
||
primary_key |
否 |
如果想通过主键实时更新表中的数据,需要在创建数据表的时候增加primary_key配置项,如下面例子中的c_timeminute。配置primary_key后,在进行数据写入操作时,如果primary_key存在,则进行更新操作,否则进行插入操作。 示例:
|
注意事项
stream_id所定义的流格式需和数据库中的表格式一致。
示例
将流audi_cheaper_than_30w的数据输出到数据库test的audi_cheaper_than_30w表下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "xxxxxx", db_url = "postgresql://192.168.1.1:8000/test", table_name = "audi_cheaper_than_30w" ); insert into audi_cheaper_than_30w select "1","2","3",4; |