更新时间:2024-11-15 GMT+08:00

Flink实时消费Binlog

注意事项

  • Binlog功能当前为beta版本,受限商用,如需使用请联系技术支持。
  • 当前仅8.3.0.100及以上版本支持HStore和HStore-opt记录Binlog功能。
  • 目前GaussDB(DWS)只有Hstore表支持Binlog功能,表需要包含主键且设置enable_binlog=on。
  • 消费的Binlog表名不要带有特殊字符,如.、""等。
  • 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。
  • 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。
  • 使用dws-connector-flink工具的Sink能力来写入Binlog数据的话,如果需要保证DN内的数据写入顺序则需要设置connectionSize的值为1;如果源端有更新主键操作或需要进行聚合计算的话,需要将ignoreUpdateBefore设置为false,否则不建议设置将ignoreUpdateBefore设置为false

Flink实时消费Binlog

使用DWS Connector来实时消费Binlog,具体请参见DWS-Connector

如果已使用其他同步工具已经将全量数据同步到了目标端,后续只想进行增量同步。则可以调用以下系统函数来更新同步点。

SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name');

源表DDL

Source端会根据操作类型自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能镜像同步表的数据,类似MySQL和Postgres的CDC功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE TABLE test_binlog_source ( 
   a int,
   b int,
   c int
) with (
   'connector' = 'dws',
   'url' = 'jdbc:gaussdb://ip:port/gaussdb',
   'binlog' = 'true',
   'tableName' = 'test_binlog_source',   
   'binlogSlotName' = 'slot',   
   'username'='xxx',   
   'password'='xxx')

Binlog相关参数说明

下表仅涉及消费Binlog时的参数。

表1 消费Binlog时的参数

参数

说明

数据类型

默认值

binlog

是否读取Binlog信息

Boolean

false

binlogSlotName

槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。

String

Flink映射表的表名

binlogBatchReadSize

批量读取binlog的数据行数

Integer

5000

fullSyncBinlogBatchReadSize

全量读取binlog的数据行数

Integer

50000

binlogReadTimeout

增量消费Binlog数据时超时时间,单位毫秒

Integer

600000

fullSyncBinlogReadTimeout

全量消费Binlog数据时超时时间,单位毫秒

Long

1800000

binlogSleepTime

实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。

Long

500

binlogMaxSleepTime

实时消费不到Binlog数据时最大休眠时间,单位毫秒。

Long

10000

binlogMaxRetryTimes

消费Binlog数据出错后的重试次数。

Integer

1

binlogRetryInterval

消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。

Long

100

binlogParallelNum

消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。

Integer

3

connectionPoolSize

JDBC连接池连接大小

Integer

5

数据同步示例

  • GaussDB(DWS)侧:

    新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。

    -- 源表(产生binlog)

    1
    CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on, enable_binlog=true);
    

    -- 目标表

    1
    CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on);
    
  • Flink侧:

    执行如下命令进行完整数据同步:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    CREATE TABLE test_binlog_source ( 
       a int,
       b int,
       c int
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'binlog' = 'true',
       'tableName' = 'test_binlog_source',   
       'binlogSlotName' = 'slot',   
       'username'='xxx',   
       'password'='xxx');
       
    
    CREATE TABLE test_binlog_sink (  
       a int,
       b int,
       c int) 
    WITH (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'tableName' = 'test_binlog_sink',   
       'ignoreUpdateBefore'='false',   
       'connectionSize' = '1',
       'username'='xxx',
       'password'='xxx');
    
    INSERT INTO test_binlog_sink select * from test_binlog_source;