更新时间:2022-12-07 GMT+08:00

DWS输出流(通过JDBC方式)

功能描述

DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。

数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见。

前提条件

  • 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。

    如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。

  • 请确保已创建DWS数据库表。
  • 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

语法格式

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "rds",
    username = "",
    password = "",
    db_url = "",
    table_name = ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,rds表示输出到关系型数据库或者数据仓库服务中。

username

数据库连接用户名。

password

数据库连接密码。

db_url

数据库连接地址格式为:postgresql://ip:port/database。

table_name

要插入数据的数据库表名。数据库表需事先创建好。

db_columns

支持配置输出流属性和数据库表属性的对应关系,需严格按照输出流的属性顺序配置。

示例:

1
2
3
4
5
6
7
8
9
create sink stream a3(student_name  string, student_age int) 
  with (
    type = "rds",
    username = "root",
    password = "xxxxxxxx",
    db_url = "postgresql://192.168.0.102:8000/test1",
    db_columns = "name,age",
    table_name = "t1"
  );

student_name对应数据库里的name属性,student_age对应数据库里的age属性。

说明:
  • 当不配置db_columns时,若输出流属性个数小于数据库表属性个数,并且数据库多出的属性都是nullable或者有默认值时,这种情况也允许。

primary_key

如果想通过主键实时更新表中的数据,需要在创建数据表的时候增加primary_key配置项,如下面例子中的c_timeminute。配置primary_key后,在进行数据写入操作时,如果primary_key存在,则进行更新操作,否则进行插入操作。

示例:

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM test(c_timeminute LONG, c_cnt LONG)
  WITH (
    type = "rds",
    username = "root",
    password = "xxxxxxxx",
    db_url = "postgresql://192.168.0.12:8000/test",
    table_name = "test",
    primary_key = "c_timeminute"
  );

注意事项

stream_id所定义的流格式需和数据库中的表格式一致。

示例

将流audi_cheaper_than_30w的数据输出到数据库test的audi_cheaper_than_30w表下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
CREATE SINK STREAM audi_cheaper_than_30w (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
  WITH (
    type = "rds",
    username = "root",
    password = "xxxxxx",
    db_url = "postgresql://192.168.1.1:8000/test",
    table_name = "audi_cheaper_than_30w"
  ); 

insert into audi_cheaper_than_30w select "1","2","3",4;