Flink实时消费Binlog
注意事项
- Binlog功能当前为beta版本,受限商用,如需使用请联系技术支持。
- 当前仅8.3.0.100及以上版本支持HStore和HStore-opt记录Binlog功能。
- 目前GaussDB(DWS)只有Hstore表支持Binlog功能,表需要包含主键且设置enable_binlog=on。
- 消费的Binlog表名不要带有特殊字符,如.、""等。
- 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。
- 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。
- 使用dws-connector-flink工具的Sink能力来写入Binlog数据的话,如果需要保证DN内的数据写入顺序则需要设置connectionSize的值为1;如果源端有更新主键操作或需要进行聚合计算的话,需要将ignoreUpdateBefore设置为false,否则不建议设置将ignoreUpdateBefore设置为false。
Flink实时消费Binlog
使用DWS Connector来实时消费Binlog,具体请参见DWS-Connector。
如果已使用其他同步工具已经将全量数据同步到了目标端,后续只想进行增量同步。则可以调用以下系统函数来更新同步点。
SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name');
源表DDL
Source端会根据操作类型自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能镜像同步表的数据,类似MySQL和Postgres的CDC功能。
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE TABLE test_binlog_source ( a int, b int, c int ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx') |
Binlog相关参数说明
下表仅涉及消费Binlog时的参数。
参数 |
说明 |
数据类型 |
默认值 |
---|---|---|---|
binlog |
是否读取Binlog信息 |
Boolean |
false |
binlogSlotName |
槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。 |
String |
Flink映射表的表名 |
binlogBatchReadSize |
批量读取binlog的数据行数 |
Integer |
5000 |
fullSyncBinlogBatchReadSize |
全量读取binlog的数据行数 |
Integer |
50000 |
binlogReadTimeout |
增量消费Binlog数据时超时时间,单位毫秒 |
Integer |
600000 |
fullSyncBinlogReadTimeout |
全量消费Binlog数据时超时时间,单位毫秒 |
Long |
1800000 |
binlogSleepTime |
实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。 |
Long |
500 |
binlogMaxSleepTime |
实时消费不到Binlog数据时最大休眠时间,单位毫秒。 |
Long |
10000 |
binlogMaxRetryTimes |
消费Binlog数据出错后的重试次数。 |
Integer |
1 |
binlogRetryInterval |
消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。 |
Long |
100 |
binlogParallelNum |
消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。 |
Integer |
3 |
connectionPoolSize |
JDBC连接池连接大小 |
Integer |
5 |
数据同步示例
- GaussDB(DWS)侧:
新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。
-- 源表(产生binlog)
1
CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on, enable_binlog=true);
-- 目标表
1
CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on);
- Flink侧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
CREATE TABLE test_binlog_source ( a int, b int, c int ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); CREATE TABLE test_binlog_sink ( a int, b int, c int) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); INSERT INTO test_binlog_sink select * from test_binlog_source;