Flink实时消费Binlog
注意事项
- 当前仅8.3.0.100及以上的版本支持HStore和Hstore_opt记录Binlog功能。
- V3 HStore表不支持Binlog,仅V3 HStore_opt表支持Binlog,且V3处于试商用阶段,使用前需要进行评估。
- 目前DWS只有HStore表和Hstore_opt支持Binlog功能,表需要包含主键且设置enable_binlog=on或者enable_binlog_timestamp=on。
- 消费的Binlog表名不要带有特殊字符,如.、""等。
- 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。
- 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。
- 在使用Flink进行全量同步时, 如果期间出现Flink异常导致全量失败,建议更换binlogSlotName重新进行全量同步,否则会有数据一致性风险。
- 使用dws-connector-flink的Sink能力来写入Binlog数据的话,需要注意以下几点:
- 如果需要保证DN内的数据写入顺序则需要将connectionSize设置为1。
- 如果源端有更新主键操作或者需要flink进行聚合计算的话,将ignoreUpdateBefore设置为false,否则不建议将ignoreUpdateBefore设置为false(默认true)。
Flink实时消费Binlog
使用DWS Connector来实时消费Binlog,具体请参见DWS-Connector。
如果已使用其他同步工具已经将全量数据同步到了目标端,后续只想进行增量同步。则可以调用以下系统函数来更新同步点。
1 | SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name'); |
源表DDL
Source端会根据操作类型自动为每行数据设置准确的Flink RowKind类型(INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能镜像同步表的数据,类似MySQL和Postgres的CDC功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 | CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx') |
Binlog相关参数说明
下表仅涉及消费Binlog时的参数。
参数 | 说明 | 数据类型 | 默认值 |
|---|---|---|---|
binlog | 是否读取Binlog信息 | Boolean | false |
binlogSlotName | 槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。 如果指定了binlog,但没有指定binlogSlotName时会默认使用Flink映射表的表名,确保增量数据的正确读取。 | String | Flink映射表的表名 |
binlogBatchReadSize | 批量读取binlog的数据行数 | Integer | 5000 |
fullSyncBinlogBatchReadSize | 全量读取binlog的数据行数 | Integer | 50000 |
binlogReadTimeout | 增量消费Binlog数据时超时时间,单位毫秒 | Integer | 600000 |
fullSyncBinlogReadTimeout | 全量消费Binlog数据时超时时间,单位毫秒 | Long | 1800000 |
binlogSleepTime | 实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。 | Long | 500 |
binlogMaxSleepTime | 实时消费不到Binlog数据时最大休眠时间,单位毫秒。 | Long | 10000 |
binlogMaxRetryTimes | 消费Binlog数据出错后的重试次数。 | Integer | 1 |
binlogRetryInterval | 消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。 | Long | 100 |
binlogParallelNum | 消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。 | Integer | 3 |
connectionPoolSize | JDBC连接池连接大小。 | Integer | 5 |
needRedistribution | 表示是否兼容扩充重分布(需要升级到对应内核版本,如果是低版本则设置为false);如果设置成true的话,flink的restart-strategy不能设置为none。 | Boolean | true |
newSystemValue | 表示读取binlog数据时是否使用新的系统字段(需要升级到对应内核版本,如果是低版本则设置为false)。 | Boolean | true |
checkNodeChangeInterval | 检测节点变化的间隔,只有needRedistribution=true才生效。 | Long | 10000 |
connectionSocketTimeout | 连接处理超时时间(可以看成客户端执行SQL超时时间),单位毫秒;默认值为0,即不设置超时时间。 | Integer | 0 |
binlogIgnoreUpdateBefore | 是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上版本支持。 | Boolean | false |
binlogStartTime | 设置从某个时间点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且表需要开启enable_binlog_timestamp。 该参数仅9.1.0.200及以上版本支持。 | String | 无 |
binlogSyncPointSize | 增量读取binlog同步点区间的大小(增量读取binlog时,如果数据量过大可能涉及下盘,可通过调整该参数控制)。 该参数仅9.1.0.200及以上版本支持。 | Integer | 5000 |
数据同步示例
- DWS侧:

新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。
-- 源表(产生binlog)
1CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
-- 目标表
1CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
- Flink侧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
-- 建立源表的映射表 CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); -- 建立目标表的映射表 CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); INSERT INTO test_binlog_sink select * from test_binlog_source;
使用java程序示例
新建源表和目标表:
1 2 3 4 | -- source create table binlog_test_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true); -- sink create table binlog_test_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true); |
demo程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | public class BinlogDemo { // binlog表的表名 private static final String BINLOG_TABLE_NAME = "binlog_test_source"; // binlog表的槽位名 private static final String BINLOG_SLOT_NAME = "binlog_test_slot"; // 写入表的表名 private static final String SINK_TABLE_NAME = "binlog_test_sink"; public static void main(String[] args) throws Exception { DwsConfig dwsConfig = buildDwsConfig(); DwsClient dwsClient = new DwsClient(dwsConfig); TableSchema sourceTableSchema = dwsClient.getTableSchema(TableName.valueOf(BINLOG_TABLE_NAME)); TableSchema sinkTableSchema = dwsClient.getTableSchema(TableName.valueOf(SINK_TABLE_NAME)); // 需要写入哪些列 List<String> sinkColumns = sinkTableSchema.getColumnNames(); // 线程池 DwsConnectionPool dwsConnectionPool = new DwsConnectionPool(dwsConfig); // 存放数据的队列 BlockingQueue<BinlogRecord> queue = new LinkedBlockingQueue<>(); // 需要同步哪些列 List<String> sourceColumnNames = sourceTableSchema.getColumnNames(); BinlogReader binlogReader = new BinlogReader(dwsConfig, queue, sourceColumnNames, dwsConnectionPool); // 启动读取任务 binlogReader.start(); binlogReader.getRecords(); while (binlogReader.isStart()) { try { while (!queue.isEmpty() && !binlogReader.hasException()) { // 读取数据 BinlogRecord record = queue.poll(); if (Objects.isNull(record)) { continue; } BinlogRecordType type = BinlogRecordType.toBinlogRecordType(record.getType()); List<Object> columnValues = record.getColumnValues(); // 写入数据 if (BinlogRecordType.INSERT.equals(type) || BinlogRecordType.UPDATE_AFTER.equals(type)) { Operate upsert = dwsClient.write(sinkTableSchema); for (int i = 0; i < sinkColumns.size(); i++) { upsert.setObject(i, columnValues.get(i), false); } upsert.commit(); } else if (BinlogRecordType.DELETE.equals(type) || BinlogRecordType.UPDATE_BEFORE.equals(type)) { Operate delete = dwsClient.delete(sinkTableSchema); for (int i = 0; i < sinkColumns.size(); i++) { String field = sinkColumns.get(i); if (!sinkTableSchema.isPrimaryKey(field)) { continue; } delete.setObject(i, columnValues.get(i), false); } delete.commit(); } } binlogReader.checkException(); } catch (Exception e) { throw new DwsClientException(ExceptionCode.GET_BINLOG_ERROR, "get binlog has error", e); } } } private static DwsConfig buildDwsConfig() { // 初始化一些配置信息(只列举一些必要的配置,具体配置信息请参考本节文档) TableConfig tableConfig = new TableConfig().withBinlog(true) .withNewSystemValue(true) .withNeedRedistribution(false) .withBinlogSlotName(BINLOG_SLOT_NAME); return DwsConfig.builder() .withUrl("链接信息") .withUsername("用户名") .withPassword("密码") .withBinlogTableName(BINLOG_TABLE_NAME) .withTableConfig(BINLOG_TABLE_NAME, tableConfig) .build(); } } |

