更新时间:2024-02-07 GMT+08:00

Postgres CDC源表

功能描述

Postgres的CDC源表,即Postgres的流式源表,用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。

前提条件

  • PostgreSQL CDC要求Postgre版本为9.6或者10,11,12。
  • 要与实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • PostgreSQL的版本不能低于PostgreSQL 11。
  • 若Postgres表有update等操作,需要在PostgreSQL中执行下列语句。注意:test.cdc_order需要修改为实际的数据库和表。
    ALTER TABLE test.cdc_order REPLICA IDENTITY FULL
  • 使用前请确认当前PostgreSQL是否包含默认的插件,可在PostgreSQL中使用下述语句查询当前插件。
    SELECT name FROM pg_available_extensions;

    若不包含默认插件名“decoderbufs”,则需要在创建PostgreSQL CDC源表中配置参数“decoding.plugin.name”,该参数指定PostgreSQL中已有的插件。

语法格式

create table postgresCdcSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'postgres-cdc',   
  'hostname' = 'PostgresHostname',
  'username' = 'PostgresUsername',
  'password' = 'PostgresPassword',
  'database-name' = 'PostgresDatabaseName',
  'schema-name' = 'PostgresSchemaName',
  'table-name' = 'PostgresTableName'
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

说明

connector

String

connector类型,需配置为'postgres-cdc'。

hostname

String

Postgres数据库的IP地址或者Hostname。

username

String

Postgres数据库用户名。

password

String

Postgres数据库服务的密码。

database-name

String

数据库名称。

schema-name

String

Postgres Schema名称。

Schema名称支持正则表达式以读取多个Schema的数据,例如test(.)*表示以test开头的所有schema。

table-name

String

Postgres表名。

表名支持正则表达式去读取多个表的数据,例如cdc_order(.)*表示以cdc_order开头的所有表。

port

5432

Integer

Postgres数据库服务的端口号。

decoding.plugin.name

decoderbufs

String

根据Postgres服务上安装的插件确定。支持的插件列表如下:

  • decoderbufs(默认值)
  • wal2json
  • wal2json_rds
  • wal2json_streaming
  • wal2json_rds_streaming
  • pgoutput

debezium.*

String

更细粒度控制Debezium客户端的行为。例如'debezium.snapshot.mode' = 'never'。

建议每个表都设置debezium.slot.name参数,以避免出现

“PSQLException: ERROR: replication slot "debezium" is active for PID 974”报错。

pwd_auth_name

String

DLI侧创建的Password类型的跨源认证名称。

使用跨源认证则无需在作业中配置账号和密码。

示例

该示例是利用Postgres-CDC实时读取RDS PostgreSQL中的数据,并写入到Print结果表中,其具体步骤如下(当前示例使用的数据库引擎版本是RDS PostgreSQL 11.11):

  1. 参考增强型跨源连接,根据PostgreSQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
  2. 设置PostgreSQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据PostgreSQL的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 在PostgreSQL中创建数据库flink,并创建名为test的schema。
  4. 在PostgreSQL中flink数据库的test schema下创建表名为cdc_order的表,SQL语句参考如下:
    create table test.cdc_order(
      order_id VARCHAR,
      order_channel VARCHAR,
      order_time VARCHAR,
      pay_amount FLOAT8,
      real_pay FLOAT8,
      pay_time VARCHAR,
      user_id VARCHAR,
      user_name VARCHAR,
      area_id VARCHAR,
      primary key(order_id)
    );
  5. 在PostgreSQL中执行下列SQL语句。如果不执行如下命令,后续Flink作业将会运行报错,具体报错信息详情参见错误信息
    ALTER TABLE test.cdc_order REPLICA IDENTITY FULL
  6. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    create table postgresCdcSource(
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id STRING,
      primary key (order_id) not enforced
    ) with (
      'connector' = 'postgres-cdc',
      'hostname' = 'PostgresHostname',
      'username' = 'PostgresUsername',
      'password' = 'PostgresPassword',
      'database-name' = 'flink',
      'schema-name' = 'test',
      'table-name' = 'cdc_order'
    );
    
    create table printSink(
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id STRING,
      primary key(order_id) not enforced
    ) with (
      'connector' = 'print'
    );
    
    insert into printSink select * from postgresCdcSource;
  7. 在PostgreSQL中执行以下命令:
    insert into test.cdc_order
      (order_id,
      order_channel,
      order_time,
      pay_amount,
      real_pay,
      pay_time,
      user_id,
      user_name,
      area_id) values
      ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'),
      ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
    
    update test.cdc_order set order_channel = 'webShop' where order_id = '202103251202020001';
    
    delete from test.cdc_order where order_id = '202103241000000001';
  8. 按照如下方式查看taskmanager.out文件中的数据结果:
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

    数据结果参考如下:

    +I(202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106)
    +I(202103251202020001,miniAppShop,2021-03-25 12:02:02,60.0,60.0,2021-03-25 12:03:00,0002,Bob,330110)
    -U(202103251202020001,miniAppShop,2021-03-25 12:02:02,60.0,60.0,2021-03-25 12:03:00,0002,Bob,330110)
    +U(202103251202020001,webShop,2021-03-25 12:02:02,60.0,60.0,2021-03-25 12:03:00,0002,Bob,330110)
    -D(202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106)

常见问题

  • Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    org.postgresql.util.PSQLException: ERROR: logical decoding requires wal_level >= logical
  • A:需要调节PostgreSQL的配置参数wal_level为logical,并重新启动。

    PostgreSQL参数修改完成后,需要重启下RDS PostgreSQL实例,使得参数生效。

  • Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    java.lang.IllegalStateException: The "before" field of UPDATE/DELETE message is null, please check the Postgres table has been set REPLICA IDENTITY to FULL level. You can update the setting by running the command in Postgres 'ALTER TABLE test.cdc_order REPLICA IDENTITY FULL'. 

    A:若运行日志出现类似报错问题,则需要在PostgreSQL中执行报错日志中的语句"ALTER TABLE test.cdc_order REPLICA IDENTITY FULL"。