DWS源表(不推荐使用)
功能描述
DLI将Flink作业从数据仓库服务(DWS)中读取数据。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。
数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
推荐使用DWS服务自研的DWS Connector。
DWS-Connector的使用方法请参考dws-connector-flink。
前提条件
- 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。
- 请确保已创建DWS数据库表。
- 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
跨源认证简介及操作方法请参考跨源认证简介。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- with参数中字段只能使用单引号,不能使用双引号。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 13 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' ); |
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
connector类型,需配置为'gaussdb'。 |
url |
是 |
无 |
String |
jdbc连接地址。“url”参数中的ip地址请使用DWS的内网地址。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 |
table-name |
是 |
无 |
String |
操作的DWS表名。如果该DWS表在某schema下,则具体可以参考如果该DWS表在某schema下的说明。 |
driver |
否 |
org.postgresql.Driver |
String |
jdbc连接驱动,默认为: org.postgresql.Driver。
|
username |
否 |
无 |
String |
DWS数据库认证用户名,需要和'password'参数一起配置。 |
password |
否 |
无 |
String |
DWS数据库认证密码,需要和'username'参数一起配置。 |
scan.partition.column |
否 |
无 |
String |
用于对输入进行分区的列名。 注意:该参数与scan.partition.lower-bound、scan.partition.upper-bound、 scan.partition.num参数必须同时配置或者同时都不配置。 |
scan.partition.lower-bound |
否 |
无 |
Integer |
第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、 scan.partition.num必须同时配置或者同时都不配置。 |
scan.partition.upper-bound |
否 |
无 |
Integer |
最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、 scan.partition.num必须同时配置或者同时都不配置。 |
scan.partition.num |
否 |
无 |
Integer |
分区的个数。 与scan.partition.column、scan.partition.upper-bound、 scan.partition.upper-bound必须同时配置或者同时都不配置。 |
scan.fetch-size |
否 |
0 |
Integer |
每次从数据库拉取数据的行数。默认值为0,表示不限制。 |
示例
该示例是从DWS数据源中读取数据,并写入到Print结果表中,其具体步骤参考如下:
- 在DWS中创建相应的表,表名为dws_order,SQL语句参考如下。
create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR);
在DWS中执行以下SQL语句,向dws_order表中插入数据。insert into public.dws_order (order_id, order_channel, order_time, pay_amount, real_pay, pay_time, user_id, user_name, area_id) values ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'), ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
- 参考增强型跨源连接,根据DWS所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置DWS的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据DWS的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将DWS作为数据源,Print作为结果表。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE dwsSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DWSIP:DWSPort/DWSdbName', 'table-name' = 'dws_order', 'driver' = 'org.postgresql.Driver', 'username' = 'DWSUserName', 'password' = 'DWSPassword' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from dwsSource;
- 按照如下操作查看taskmanager.out文件中的数据结果。
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。
数据结果参考如下:
+I(202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106) +I(202103251202020001,miniAppShop,2021-03-25 12:02:02,60.0,60.0,2021-03-25 12:03:00,0002,Bob,330110)
常见问题
- Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决?
java.io.IOException: unable to open JDBC writer ... Caused by: org.postgresql.util.PSQLException: The connection attempt failed. ... Caused by: java.net.SocketTimeoutException: connect timed out
A:应考虑是跨源没有绑定,或者跨源没有绑定成功。- 参考增强型跨源连接章节,重新配置跨源。参考DLI跨源连接DWS失败进行问题排查。
- Q:如果该DWS表在某schema下,应该如何配置?
A:如下示例是使用schema为dbuser2下的表dws_order。
CREATE TABLE dwsSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DWSIP:DWSPort/DWSdbName', 'table-name' = 'dbuser2.dws_order', 'driver' = 'org.postgresql.Driver', 'username' = 'DWSUserName', 'password' = 'DWSPassword' );