从PostgreSQL CDC源表读取数据写入到DWS
本指导仅适用于Flink 1.12版本。
场景描述
CDC是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库的增量变动记录,同步到一个或多个数据目的中。CDC在数据同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。
本示例通过创建PostgreSQL CDC源表来监控Postgres的数据变化,并将变化的数据信息插入到DWS数据库中。
前提条件
- 已创建RDS Postgres实例。本示例创建的RDS Postgres数据库版本选择为:11。
具体步骤可参考:RDS PostgreSQL快速入门。
创建的RDS Postgres数据库版本不能低于11。
- 已创建DWS实例。
具体创建DWS集群的操作可以参考创建DWS集群。
整体作业开发流程
步骤1:创建队列:创建DLI作业运行的队列。
步骤2:创建RDS Postgres数据库:创建RDS Postgres的数据库和表。
步骤3:创建DWS数据库和表:创建用于接收数据的DWS数据库和表。
步骤4:创建增强型跨源连接:DLI上创建连接RDS和DWS的跨源连接,打通网络。
步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。
步骤6:发送数据和查询结果:RDS Postgres的表上插入数据,在DWS上查看运行结果。
步骤1:创建队列
- 登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。
- 在队列管理界面,单击界面右上角的“购买队列”。
- 在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。
- 计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。
- 区域和项目:保持默认值即可。
- 名称:填写具体的队列名称。
新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。
队列名称不区分大小写,系统会自动转换为小写。
- 类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。
- AZ策略、CPU架构、规格:保持默认即可。
- 企业项目:当前选择为“default”。
- 高级选项:选择“自定义”。
- 网段:配置队列网段。例如,当前配置为10.0.0.0/16。
队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。
- 其他参数根据需要选择和配置。
- 参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。
步骤2:创建RDS Postgres数据库
- 登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS Postgres实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。
- 输入实例登录的用户名和密码。单击“登录”,即可进入RDS Postgres数据库并进行管理。
- 新建数据库实例testrdsdb。
- 在testrdsdb数据库下,新建名称为test的Schema。
- 单击“SQL操作 > SQL查询 ”,进入SQL查询页面创建RDS Postgres表。
create table test.cdc_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR, primary key(order_id));
在Postgre中执行下列SQL语句。ALTER TABLE test.cdc_order REPLICA IDENTITY FULL;
步骤3:创建DWS数据库和表
- 连接已创建的DWS集群。
- 执行以下命令连接DWS集群的默认数据库“gaussdb”:
gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r
- gaussdb:DWS集群默认数据库。
- DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。
- dbadmin:创建集群时设置的默认管理员用户名。
- -W:默认管理员用户的密码。
- 在命令行窗口输入以下命令创建数据库“testdwsdb”。
CREATE DATABASE testdwsdb;
- 执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。
\q gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r
- 执行以下命令创建表。
create schema test; set current_schema= test; drop table if exists dws_order; CREATE TABLE dws_order ( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR );
步骤4:创建增强型跨源连接
- 创建DLI连接RDS的增强型跨源连接
- 在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。
- 在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
- 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。
- 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
- 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
- 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。
- 弹性资源池:选择步骤1:创建队列中已经创建的队列。
- 虚拟私有云:选择RDS的虚拟私有云。
- 子网:选择RDS的子网。
- 其他参数可以根据需要选择配置。
参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。
- 单击“资源管理 > 队列管理 ”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
- 在“测试连通性”界面,根据2中获取的RDS连接信息,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。
- 创建DLI连接DWS的增强型跨源连接
- 在DWS管理控制台,选择“集群管理”,单击已创建的DWS集群名称,进入到DWS的基本信息页面。
- 在“基本信息”的“数据库属性”中获取该实例的“内网IP”、“端口”,“基本信息”页面的“网络”中获取“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
- 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。
- RDS和DWS实例属于同一VPC和子网下?
- 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
- 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
- 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_dws。
- 弹性资源池:选择步骤1:创建队列中已经创建的队列。
- 虚拟私有云:选择DWS的虚拟私有云。
- 子网:选择DWS的子网。
- 其他参数可以根据需要选择配置。
参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。
- 单击“资源管理 > 队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
- 在“测试连通性”界面,根据2中获取的DWS连接信息,地址栏输入“DWS内网IP:DWS端口”,单击“测试”测试DLI到DWS网络是否可达。
步骤5:运行作业
- 在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。
- 在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkCDCPostgreDWS。单击“确定”,跳转到Flink作业编辑界面。
- 在Flink OpenSource SQL作业编辑界面,配置如下参数,其他参数默认即可。
- 所属队列:选择步骤1:创建队列中创建的队列。
- Flink版本:选择1.12。
- 保存作业日志:勾选。
- OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。
- 开启Checkpoint:勾选。
- Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。
本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到Elasticsearch。
请参考Flink OpenSource SQL 1.12创建Postgres CDC源表和Flink OpenSource SQL 1.12创建DWS结果表。
表1 作业运行参数说明 参数
参数说明
所属队列
默认选择“共享队列”,可以按需选择自定义的CCE独享队列,并配置以下参数。
“UDF Jar”:用户自定义UDF文件,在选择UDF Jar之前需要将对应的jar包上传至OBS桶中,并在 中创建程序包,具体操作请参考创建程序包。
用户可以在SQL中调用插入Jar包中的自定义函数。
说明:当子用户在创建作业时,子用户只能选择已经被分配的队列。
当所选择队列的剩余容量不能满足作业需求时,系统会自动扩容,将按照增加的容量计费。当队列空闲时,系统也会自动缩容。
Flink版本
具体参数说明如下:
- 1.10:具体SQL语法参考Flink OpenSource SQL1.10语法。
- 1.12:具体SQL语法参考Flink OpenSource SQL1.12语法。
CU数量
CU数量为DLI的计算单元数量和管理单元数量总和,CU也是DLI的计费单位,1CU=1核4G。
当前配置的CU数量为运行作业时所需的CU数,不能超过其绑定队列的CU数量。
管理单元
管理单元CU数量。
并行数
并行数是指同时运行Flink OpenSource SQL作业的最大任务数。
说明:最大并行数不能大于计算单元(CU数量-管理单元)的4倍。
TaskManager配置
用于设置TaskManager资源参数。
勾选后需配置下列参数:
- “单TM所占CU数”:每个TaskManager占用的资源数量。
- “单TM Slot”:每个TaskManager包含的Slot数量。
OBS桶
选择OBS桶用于保存用户作业日志信息、checkpoint等信息。如果选择的OBS桶是未授权状态,需要单击“OBS授权”。
保存作业日志
设置是否将作业运行时的日志信息保存到OBS。日志信息的保存路径为:“桶名/jobs/logs/作业id开头的目录”。
注意:该参数建议勾选,否则作业运行完成后不会生成运行日志,后续如果作业运行异常则无法获取运行日志进行定位。
勾选后需配置下列参数:
“OBS桶”:选择OBS桶用于保存用户作业日志信息。如果选择的OBS桶是未授权状态,需要单击“OBS授权”。说明:如果同时勾选了“开启Checkpoint”和“保存作业日志”,OBS授权一次即可。
作业异常告警
设置是否将作业异常告警信息,如作业出现运行异常或者欠费情况,以SMN的方式通知用户。
勾选后需配置下列参数:
“SMN主题”:
选择一个自定义的SMN主题。如何自定义SMN主题,请参见《消息通知服务用户指南》中“创建主题”章节。
开启Checkpoint
设置是否开启作业快照,开启后可基于Checkpoint(一致性检查点)恢复作业。
勾选后需配置下列参数:- “Checkpoint间隔”:Checkpoint的时间间隔,单位为秒,输入范围 1~999999,默认值为30s。
- “Checkpoint模式”:支持如下两种模式:
- At least once:事件至少被处理一次。
- Exactly once:事件仅被处理一次。
- “OBS桶”:选择OBS桶用于保存用户Checkpoint。如果选择的OBS桶是未授权状态,需要单击“OBS授权”。
Checkpoint保存路径为:“桶名/jobs/checkpoint/作业id开头的目录”。说明:
如果同时勾选了“开启Checkpoint”和“保存作业日志”,OBS授权一次即可。
异常自动重启
设置是否启动异常自动重启功能,当作业异常时将自动重启并恢复作业。
勾选后需配置下列参数:
- “异常重试最大次数”:配置异常重试最大次数。单位为“次/小时”。
- 无限:无限次重试。
- 有限:自定义重试次数。
- “从Checkpoint恢复”:需要同时勾选“开启Checkpoint”才可配置该参数。
空闲状态保留时长
用于清除GroupBy或Window经过最大保留时间后仍未更新的中间状态,默认设置为1小时。
脏数据策略
选择处理脏数据的策略。支持如下三种策略:“忽略”,“抛出异常”和“保存”。
“脏数据策略”选择“保存”时,配置“脏数据转储地址”。单击地址框选择保存脏数据的OBS路径。
create table PostgreCdcSource( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING, primary key (order_id) not enforced ) with ( 'connector' = 'postgres-cdc', 'hostname' = '192.168.15.153',--IP替换为RDS Postgres的实例IP 'port' = '5432',--端口替换为RDS Postgres的实例端口 'pwd_auth_name'= 'xxxxx', --DLI侧创建的Password类型的跨源认证名称。使用跨源认证则无需在作业中配置账号和密码。 'database-name' = 'testrdsdb',--RDS Postgres实例的数据库名 'schema-name' = 'test',--RDS Postgres数据库下的schema 'table-name' = 'cdc_order'--RDS Postgres数据库下的表名 ); create table dwsSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING, primary key(order_id) not enforced ) with ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://192.168.168.16:8000/testdwsdb', ---192.168.168.16:8000替换为DWS的内网IP和端口,testdwsdb为创建的DWS数据库名 'table-name' = 'test\".\"dws_order', ---test为创建的DWS表的schema,dws_order为对应的DWS表名 'username' = 'xxxxx',--替换为DWS实例的用户名 'password' = 'xxxxx',--替换为DWS实例的用户密码 'write.mode' = 'insert' ); insert into dwsSink select * from PostgreCdcSource where pay_amount > 100;
- 单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。
步骤6:发送数据和查询结果
- 登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS Postgres实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。
- 输入实例登录的用户名和密码。单击“登录”,即可进入RDS Postgres数据库并进行管理。
- 在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,插入测试数据。
insert into test.cdc_order values ('202103241000000001','webShop','2021-03-24 10:00:00','50.00','100.00','2021-03-24 10:02:03','0001','Alice','330106'), ('202103251606060001','appShop','2021-03-24 12:06:06','200.00','180.00','2021-03-24 16:10:06','0002','Jason','330106'), ('202103261000000001','webShop','2021-03-24 14:03:00','300.00','100.00','2021-03-24 10:02:03','0003','Lily','330106'), ('202103271606060001','appShop','2021-03-24 16:36:06','99.00','150.00','2021-03-24 16:10:06','0001','Henry','330106');
- 连接已创建的DWS集群。
- 执行以下命令连接DWS集群的默认数据库“testdwsdb”:
gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r
- 执行以下语句,查询DWS的表数据。
select * from test.dws_order;
查询结果参考如下:order_channel order_channel order_time pay_amount real_pay pay_time user_id user_name area_id 202103251606060001 appShop 2021-03-24 12:06:06 200.0 180.0 2021-03-24 16:10:06 0002 Jason 330106 202103261000000001 webShop 2021-03-24 14:03:00 300.0 100.0 2021-03-24 10:02:03 0003 Lily 330106