更新时间:2024-12-05 GMT+08:00
分享

Flink实时消费Binlog

注意事项

  • 当前仅8.3.0.100及以上的版本支持HStore和HStore-opt记录Binlog功能,且V3处于试商用阶段,使用前需要进行评估。
  • 目前GaussDB(DWS)只有Hstore表和HStore-opt支持Binlog功能,表需要包含主键且设置enable_binlog=on或者enable_binlog_timestamp=on。
  • 消费的Binlog表名不要带有特殊字符,如.、""等。
  • 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。
  • 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。
  • 使用dws-connector-flink的Sink能力来写入Binlog数据的话,需要注意以下几点:
    • 如果需要保证DN内的数据写入顺序则需要设置connectionSize设置为1。
    • 如果源端有更新主键操作或者需要flink进行聚合计算的话,将ignoreUpdateBefore设置为false,否则不建议将 ignoreUpdateBefore设置为false(默认true)。

Flink实时消费Binlog

使用DWS Connector来实时消费Binlog,具体请参见DWS-Connector

如果已使用其他同步工具已经将全量数据同步到了目标端,后续只想进行增量同步。则可以调用以下系统函数来更新同步点。

1
SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name');

源表DDL

Source端会根据操作类型自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能镜像同步表的数据,类似MySQL和Postgres的CDC功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE TABLE test_binlog_source ( 
   a int,
   b int,
   c int,
   primary key(a) NOT ENFORCED
) with (
   'connector' = 'dws',
   'url' = 'jdbc:gaussdb://ip:port/gaussdb',
   'binlog' = 'true',
   'tableName' = 'test_binlog_source',   
   'binlogSlotName' = 'slot',   
   'username'='xxx',   
   'password'='xxx')

Binlog相关参数说明

下表仅涉及消费Binlog时的参数。

表1 消费Binlog时的参数

参数

说明

数据类型

默认值

binlog

是否读取Binlog信息

Boolean

false

binlogSlotName

槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。

String

Flink映射表的表名

binlogBatchReadSize

批量读取binlog的数据行数

Integer

5000

fullSyncBinlogBatchReadSize

全量读取binlog的数据行数

Integer

50000

binlogReadTimeout

增量消费Binlog数据时超时时间,单位毫秒

Integer

600000

fullSyncBinlogReadTimeout

全量消费Binlog数据时超时时间,单位毫秒

Long

1800000

binlogSleepTime

实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。

Long

500

binlogMaxSleepTime

实时消费不到Binlog数据时最大休眠时间,单位毫秒。

Long

10000

binlogMaxRetryTimes

消费Binlog数据出错后的重试次数。

Integer

1

binlogRetryInterval

消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。

Long

100

binlogParallelNum

消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。

Integer

3

connectionPoolSize

JDBC连接池连接大小。

Integer

5

needRedistribution

表示是否兼容扩充重分布(需要升级到对应内核版本,如果是低版本则设置为false);如果设置成true的话,flink的restart-strategy不能设置为none。

Boolean

true

newSystemValue

表示读取binlog数据时是否使用新的系统字段(需要升级到对应内核版本,如果是低版本则设置为false)。

Boolean

true

checkNodeChangeInterval

检测节点变化的间隔,只有needRedistribution=true才生效。

Long

10000

connectionSocketTimeout

连接处理超时时间(可以看成客户端执行SQL超时时间),单位毫秒;默认值为0,即不设置超时时间。

Integer

0

binlogIgnoreUpdateBefore

是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上版本支持。

Boolean

false

binlogStartTime

设置从某个时间点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且表需要开启enable_binlog_timestamp

该参数仅9.1.0.200及以上版本支持。

String

binlogSyncPointSize

增量读取binlog同步点区间的大小(增量读取binlog时,如果数据量过大可能涉及下盘,可通过调整该参数控制)。

该参数仅9.1.0.200及以上版本支持。

Integer

5000

数据同步示例

  • GaussDB(DWS)侧:

    新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。

    -- 源表(产生binlog)

    1
    CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
    

    -- 目标表

    1
    CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
    
  • Flink侧:

    执行如下命令进行完整数据同步:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    -- 建立源表的映射表
    CREATE TABLE test_binlog_source ( 
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'binlog' = 'true',
       'tableName' = 'test_binlog_source',   
       'binlogSlotName' = 'slot',   
       'username'='xxx',   
       'password'='xxx');
       
    -- 建立目标表的映射表
    CREATE TABLE test_binlog_sink (  
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'tableName' = 'test_binlog_sink',   
       'ignoreUpdateBefore'='false',   
       'connectionSize' = '1',
       'username'='xxx',
       'password'='xxx');
    
    INSERT INTO test_binlog_sink select * from test_binlog_source;
    

使用java程序示例

新建源表和目标表:

1
2
3
4
-- source
create table binlog_test_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
-- sink
create table binlog_test_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);

demo程序:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class BinlogDemo {

    // binlog表的表名
    private static final String BINLOG_TABLE_NAME = "binlog_test_source";

    // binlog表的槽位名
    private static final String BINLOG_SLOT_NAME = "binlog_test_slot";

    // 写入表的表名
    private static final String SINK_TABLE_NAME = "binlog_test_sink";

    public static void main(String[] args) throws Exception {
        DwsConfig dwsConfig = buildDwsConfig();
        DwsClient dwsClient = new DwsClient(dwsConfig);

        TableSchema sourceTableSchema = dwsClient.getTableSchema(TableName.valueOf(BINLOG_TABLE_NAME));
        TableSchema sinkTableSchema = dwsClient.getTableSchema(TableName.valueOf(SINK_TABLE_NAME));

        // 需要写入哪些列
        List<String> sinkColumns = sinkTableSchema.getColumnNames();

        // 线程池
        DwsConnectionPool dwsConnectionPool = new DwsConnectionPool(dwsConfig);
        // 存放数据的队列
        BlockingQueue<BinlogRecord> queue = new LinkedBlockingQueue<>();
        // 需要同步哪些列
        List<String> sourceColumnNames = sourceTableSchema.getColumnNames();

        BinlogReader binlogReader = new BinlogReader(dwsConfig, queue, sourceColumnNames, dwsConnectionPool);

        // 启动读取任务
        binlogReader.start();
        binlogReader.getRecords();

        while (binlogReader.isStart()) {
            try {
                while (!queue.isEmpty() && !binlogReader.hasException()) {
                    // 读取数据
                    BinlogRecord record = queue.poll();
                    if (Objects.isNull(record)) {
                        continue;
                    }
                    BinlogRecordType type = BinlogRecordType.toBinlogRecordType(record.getType());
                    List<Object> columnValues = record.getColumnValues();

                    // 写入数据
                    if (BinlogRecordType.INSERT.equals(type) || BinlogRecordType.UPDATE_AFTER.equals(type)) {
                        Operate upsert = dwsClient.write(sinkTableSchema);
                        for (int i = 0; i < sinkColumns.size(); i++) {
                            upsert.setObject(i, columnValues.get(i), false);
                        }
                        upsert.commit();
                    } else if (BinlogRecordType.DELETE.equals(type) || BinlogRecordType.UPDATE_BEFORE.equals(type)) {
                        Operate delete = dwsClient.delete(sinkTableSchema);
                        for (int i = 0; i < sinkColumns.size(); i++) {
                            String field = sinkColumns.get(i);
                            if (!sinkTableSchema.isPrimaryKey(field)) {
                                continue;
                            }
                            delete.setObject(i, columnValues.get(i), false);
                        }
                        delete.commit();
                    }
                }
                binlogReader.checkException();
            } catch (Exception e) {
                throw new DwsClientException(ExceptionCode.GET_BINLOG_ERROR, "get binlog  has error", e);
            }
        }
    }

    private static DwsConfig buildDwsConfig() {
        // 初始化一些配置信息(只列举一些必要的配置,具体配置信息请参考文档)
        TableConfig tableConfig = new TableConfig().withBinlog(true)
                .withNewSystemValue(true)
                .withNeedRedistribution(false)
                .withBinlogSlotName(BINLOG_SLOT_NAME);
        return DwsConfig.builder()
                .withUrl("链接信息")
                .withUsername("用户名")
                .withPassword("密码")
                .withBinlogTableName(BINLOG_TABLE_NAME)
                .withTableConfig(BINLOG_TABLE_NAME, tableConfig)
                .build();
    }
}

相关文档