更新时间:2026-04-16 GMT+08:00
分享

做Binlog流表

Binlog功能仅9.1.0及以上集群版本开始支持。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
with (
  'connector' = 'dws',
  'url' = '',
  'tableName' = '',
  'username' = '',
  'password' = '',
  'binlog' = 'true'
);

参数说明

表1 数据库配置

参数

说明

默认值

connector

flink框架区分Connector参数,固定为dws。

-

url

数据库连接地址。

-

username

配置连接用户。

-

password

数据库用户密码。

-

表2 连接配置参数

参数

名称

类型

说明

默认值

connectionSize

读取线程池大小

int

用于执行操作的线程数量=数据库连接数量,同写入线程大小。

1

binlog

是否读取Binlog信息

boolean

是否开启Binlog功能。

如果为false,下面的参数均不生效。

false

binlogSlotName

Binlog槽位名称

String

Binlog槽位名称,因为可能存在多个flink任务同时消费同一张表的Binlog信息,所以每个任务需要保证其槽位名称不相同,否则可能发生未知问题。

flink表表名

binlogSyncPointSize

读取Binlog同步点区间的大小

int

读取Binlog同步点区间的大小,每次从dws数据表获取csn范围在(当前csn, 当前csn + binlogSyncPointSize)内的binlog信息,并在处理完这些数据后进行打点。

如果设置过大,可能会引发查询数据下盘造成性能下降。

5000

binlogBatchReadSize

增量读取Binlog时一批数据的行数

int

增量同步处理一次数据的数据量。

5000

fullSyncBinlogBatchReadSize

全量读取Binlog时一批数据的行数

int

全量同步处理一次数据的数据量。

50000

binlogReadTimeout

增量消费Binlog数据时超时时间(毫秒)

long

增量同步Binlog信息的超时时间。

600000

fullSyncBinlogReadTimeout

全量消费Binlog数据时超时时间(毫秒)

long

全量同步Binlog信息的超时时间。

1800000

binlogSleepTime

实时消费不到Binlog数据时休眠时间(毫秒)

long

没有Binlog数据可消费时的睡眠时间,过了这段时间后会重新查看是否有新的Binlog数据。

500

binlogMaxSleepTime

实时消费不到Binlog数据时最大休眠时间(毫秒)

long

没有Binlog数据可消费时的最大睡眠时间。

10000

binlogQueueSize

Binlog数据队列大小

int

Binlog处理事件队列长度,默认为0(无限长度),在资源紧张时会导致OOM,需要按照资源使用情况进行调整。

0

needRedistribution

是否兼容扩容重分布

boolean

是否兼容扩容重分布,若为false,当DWS集群处于重分布状态时可能会有未知问题。

false

checkNodeChangeInterval

检测节点变化的时间间隔(毫秒)

long

仅在needRedistribution为true时有效,每隔一段时间检查节点的变化情况。

10000

binlogParallelNum

消费Binlog数据时的线程数

int

设置消费Binlog数据时的线程数,仅在任务并发度小于DWS集群DN数时有效。

3

binlogMaxRetryTimes

消费Binlog数据出错后的重试次数

int

消费Binlog数据出错后的重试次数。

1

binlogRetryInterval

消费Binlog数据出错后的重试时间间隔(毫秒)

long

消费Binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。

100

binlogStartTime

设置一个时间点,则会从该时间点已经同步的数据点开始消费Binlog

String

设置从某个时间点对应的数据同步点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且对应DWS表需要开启enable_binlog_timestamp参数。

例:2025/05/01 00:00:00时间点对应的binlog_sync_point为50000,那么只同步binlog_sync_point大于50000的数据。

注意:如果作业在停止状态时插入数据,那么在那之后的时间对应的数据同步点会变为空,此时执行同步可能导致该参数失效。

该参数仅9.1.0.200及以上集群版本支持。

-

binlogIgnoreUpdateBefore

是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息

bool

是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上集群版本支持。

false

executeDirectOn

是否开启直连DN模式

bool

是否让SQL直接由DN执行,若为false,则产生的SQL交给CN执行,会产生一些调度上的消耗,影响性能。

true

binlogSkipFullSync

是否跳过全量同步阶段

bool

是否直接跳过全量同步,若为true,则在当前无可用点位时,以当前数据位置作位点开始增量同步。

false

binlogCondition

Binlog表下推条件

String

设置flink表的下推条件,当前仅支持简单的下推条件。

例:当前表中有一个叫count的int字段,现在不想让flink表统计count大于100的数据,则可以按以下写法:

'binlogCondition'='count <= 100'

-

示例

  • DWS侧:

    新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。

    -- 源表(产生binlog)

    1
    CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
    

    -- 目标表

    1
    CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
    
  • Flink侧:

    执行如下命令进行完整数据同步:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    -- 建立源表的映射表
    CREATE TABLE test_binlog_source ( 
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'binlog' = 'true',
       'tableName' = 'test_binlog_source',   
       'binlogSlotName' = 'slot',   
       'username'='xxx',   
       'password'='xxx');
       
    -- 建立目标表的映射表
    CREATE TABLE test_binlog_sink (  
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'tableName' = 'test_binlog_sink',   
       'ignoreUpdateBefore'='false',   
       'connectionSize' = '1',
       'username'='xxx',
       'password'='xxx');
    
    INSERT INTO test_binlog_sink select * from test_binlog_source;
    

常见问题

  • Q:Binlog作业运行过程中出现异常,could not find slot: xxx for rel: xxx, you need to call pgxc_get_binlog_sync_point(),要怎么处理?

    A:检查下DWS的Binlog流表的表级参数以及Flink的Checkpoint间隔,重点关注DWS表中的表级参数binlog_ttl,此参数的作用为:如果同步任务注册的同步点超过这个时间没有进行同步,那么就会将该同步点删除。当不显示设置该参数时,binlog_ttl将使用默认值86400, 单位为秒。如果binlog_ttl设置过短可能会导致上述异常的产生。

    最佳实践:配置binlog_ttl要大于Flink侧的Checkpoint间隔。即可避免上述问题产生。

相关文档