DWS源表
功能描述
DLI将Flink作业从数据仓库服务(DWS)中读取数据。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。
数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。
前提条件
- 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。
- 请确保已创建DWS数据库表。
- 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
语法格式
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' ); | 
参数说明
| 参数 | 是否必选 | 说明 | 
|---|---|---|
| connector.type | 是 | connector类型,需配置为'gaussdb' | 
| connector.url | 是 | jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。DWS数据库版本为8.1.0以后的版本时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 | 
| connector.table | 是 | 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考示例说明。 | 
| connector.driver | 否 | jdbc连接驱动,默认为: org.postgresql.Driver。 DWS数据库版本为8.1.0以后的版本时,连接驱动为:com.huawei.gauss200.jdbc.Driver。 | 
| connector.username | 否 | 数据库认证用户名,需要和'connector.password'一起配置 | 
| connector.password | 否 | 数据库认证密码,需要和'connector.username'一起配置 | 
| connector.read.partition.column | 否 | 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 | 
| connector.read.partition.lower-bound | 否 | 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 | 
| connector.read.partition.upper-bound | 否 | 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 | 
| connector.read.partition.num | 否 | 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 | 
| connector.read.fetch-size | 否 | 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示 | 
示例
- 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。
    
    1 2 3 4 5 6 7 8 9 10 11 12 create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx' ); 当DWS表test在名为test_schema的schema下时,可以参考如下样例。 1 2 3 4 5 6 7 8 9 10 11 12 create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'test_schema\".\"test', 'connector.username' = 'xx', 'connector.password' = 'xx' ); 
- 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。
    当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.driver' = 'com.huawei.gauss200.jdbc.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.username' = 'xx', 'connector.password' = 'xx' ); 
 
    