RDS输出流
功能描述
DLI将Flink作业的输出数据输出到关系型数据库(RDS)中。目前支持PostgreSQL和MySQL两种数据库。PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。MySQL数据库适用于各种WEB应用、电子商务应用、企业应用、移动应用等场景,减少IT部署和维护成本。
关系型数据库(Relational Database Service,简称RDS)是一种基于云计算平台的在线关系型数据库服务。RDS包括如下几种类型的数据库:MySQL、PostgreSQL和Microsoft SQL Server。
RDS的更多信息,请参见《关系型数据库用户指南》。
前提条件
语法格式
1 2 3 4 5 6 7 8 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "rds",
username = "",
password = "",
db_url = "",
table_name = ""
);
|
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,rds表示输出到关系型数据库中。 |
username |
是 |
数据库连接用户名。 |
password |
是 |
数据库连接密码。 |
db_url |
是 |
数据库连接地址,格式为:"{database_type}://ip:port/database" 目前支持两种数据库连接:MySQL和PostgreSQL
|
table_name |
是 |
要插入数据的数据库表名。 |
db_columns |
否 |
支持配置输出流属性和数据库表属性的对应关系,需严格按照输出流的属性顺序配置。 示例: create sink stream a3(student_name string, student_age int) with ( type = "rds", username = "root", password = "xxxxxxxx", db_url = "mysql://192.168.0.102:8635/test1", db_columns = "name,age", table_name = "t1" ); student_name对应数据库里的name属性,student_age对应数据库里的age属性。
说明:
|
primary_key |
否 |
如果想通过主键实时更新表中的数据,需要在创建数据表的时候增加primary_key配置项,如下面例子中的c_timeminute。配置primary_key后,在进行数据写入操作时,如果primary_key存在,则进行更新操作,否则进行插入操作。 示例: CREATE SINK STREAM test(c_timeminute LONG, c_cnt LONG) WITH ( type = "rds", username = "root", password = "xxxxxxxx", db_url = "mysql://192.168.0.12:8635/test", table_name = "test", primary_key = "c_timeminute"); |
operation_field |
否 |
该配置项用于指定数据的处理方式,需要配置为${field_name}的形式,field_name的类型必读为string,field_name所代表的真正内容表示为D或者DELETE时,表示删除数据库中该条记录,其余默认插入数据。 |
注意事项
stream_id所定义的流格式需和数据库中的表格式一致。
示例
将流audi_cheaper_than_30w的数据输出到数据库test的audi_cheaper_than_30w表下。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE SINK STREAM audi_cheaper_than_30w (
car_id STRING,
car_owner STRING,
car_brand STRING,
car_price INT
)
WITH (
type = "rds",
username = "root",
password = "xxxxxx",
db_url = "mysql://192.168.1.1:8635/test",
table_name = "audi_cheaper_than_30w"
);
|
DWS数据库版本大于8.1.0后,无法用开源的postgresql驱动连接,需要用gaussdb驱动进行连接。