Flink实时消费Binlog
注意事项
- 当前仅8.3.0.100及以上的版本支持HStore和HStore-opt记录Binlog功能,且处于试商用阶段,使用前需要进行评估。
- 目前GaussDB(DWS)只有Hstore表支持Binlog功能,表需要包含主键且设置enable_binlog=on。
- 消费的Binlog表名不要带有特殊字符,如.、""等。
- 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。
- 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。
- 使用dws-connector-flink的Sink能力来写入Binlog数据的话,需要注意以下几点:
- 如果需要保证DN内的数据写入顺序则需要设置connectionSize设置为1。
- 如果源端有更新主键操作或者需要flink进行聚合计算的话,将ignoreUpdateBefore设置为false,否则不建议将 ignoreUpdateBefore设置为false(默认true)。
Flink实时消费Binlog
使用DWS Connector来实时消费Binlog,具体请参见DWS-Connector。
如果已使用其他同步工具已经将全量数据同步到了目标端,后续只想进行增量同步。则可以调用以下系统函数来更新同步点。
1
|
SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name'); |
源表DDL
Source端会根据操作类型自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能镜像同步表的数据,类似MySQL和Postgres的CDC功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx') |
Binlog相关参数说明
下表仅涉及消费Binlog时的参数。
参数 |
说明 |
数据类型 |
默认值 |
---|---|---|---|
binlog |
是否读取Binlog信息 |
Boolean |
false |
binlogSlotName |
槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。 |
String |
Flink映射表的表名 |
binlogBatchReadSize |
批量读取binlog的数据行数 |
Integer |
5000 |
fullSyncBinlogBatchReadSize |
全量读取binlog的数据行数 |
Integer |
50000 |
binlogReadTimeout |
增量消费Binlog数据时超时时间,单位毫秒 |
Integer |
600000 |
fullSyncBinlogReadTimeout |
全量消费Binlog数据时超时时间,单位毫秒 |
Long |
1800000 |
binlogSleepTime |
实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。 |
Long |
500 |
binlogMaxSleepTime |
实时消费不到Binlog数据时最大休眠时间,单位毫秒。 |
Long |
10000 |
binlogMaxRetryTimes |
消费Binlog数据出错后的重试次数。 |
Integer |
1 |
binlogRetryInterval |
消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。 |
Long |
100 |
binlogParallelNum |
消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。 |
Integer |
3 |
connectionPoolSize |
JDBC连接池连接大小。 |
Integer |
5 |
needRedistribution |
表示是否兼容扩充重分布(需要升级到对应内核版本,如果是低版本则设置为false);如果设置成true的话,flink的restart-strategy不能设置为none。 |
Boolean |
true |
newSystemValue |
表示读取binlog数据时是否使用新的系统字段(需要升级到对应内核版本,如果是低版本则设置为false)。 |
Boolean |
true |
checkNodeChangeInterval |
检测节点变化的间隔,只有needRedistribution=true才生效。 |
Long |
10000 |
connectionSocketTimeout |
连接处理超时时间(可以看成客户端执行SQL超时时间),单位毫秒;默认值为0,即不设置超时时间。 |
Integer |
0 |
binlogIgnoreUpdateBefor |
是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上版本支持。 |
Boolean |
false |
binlogStartTime |
设置从某个时间点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且表需要开启enable_binlog_timestamp。 该参数仅9.1.0.200及以上版本支持。 |
String |
无 |
binlogSyncPointSize |
增量读取binlog同步点区间的大小(增量读取binlog时,如果数据量过大可能涉及下盘,可通过调整该参数控制)。 该参数仅9.1.0.200及以上版本支持。 |
Integer |
5000 |
数据同步示例
- GaussDB(DWS)侧:
新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。
-- 源表(产生binlog)
1
CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on, enable_binlog=true);
-- 目标表
1
CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on);
- Flink侧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
-- 建立源表的映射表 CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); -- 建立目标表的映射表 CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); INSERT INTO test_binlog_sink select * from test_binlog_source;
使用java程序示例
新建源表和目标表:
1 2 3 4 |
-- source create table binlog_test_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore=on, enable_binlog=true); -- sink create table binlog_test_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore=on, enable_binlog=true); |
demo程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
public class BinlogDemo { // binlog表的表名 private static final String BINLOG_TABLE_NAME = "binlog_test_source"; // binlog表的槽位名 private static final String BINLOG_SLOT_NAME = "binlog_test_slot"; // 写入表的表名 private static final String SINK_TABLE_NAME = "binlog_test_sink"; public static void main(String[] args) throws Exception { DwsConfig dwsConfig = buildDwsConfig(); DwsClient dwsClient = new DwsClient(dwsConfig); TableSchema sourceTableSchema = dwsClient.getTableSchema(TableName.valueOf(BINLOG_TABLE_NAME)); TableSchema sinkTableSchema = dwsClient.getTableSchema(TableName.valueOf(SINK_TABLE_NAME)); // 需要写入哪些列 List<String> sinkColumns = sinkTableSchema.getColumnNames(); // 线程池 DwsConnectionPool dwsConnectionPool = new DwsConnectionPool(dwsConfig); // 存放数据的队列 BlockingQueue<BinlogRecord> queue = new LinkedBlockingQueue<>(); // 需要同步哪些列 List<String> sourceColumnNames = sourceTableSchema.getColumnNames(); BinlogReader binlogReader = new BinlogReader(dwsConfig, queue, sourceColumnNames, dwsConnectionPool); // 启动读取任务 binlogReader.start(); binlogReader.getRecords(); while (binlogReader.isStart()) { try { while (!queue.isEmpty() && !binlogReader.hasException()) { // 读取数据 BinlogRecord record = queue.poll(); if (Objects.isNull(record)) { continue; } BinlogRecordType type = BinlogRecordType.toBinlogRecordType(record.getType()); List<Object> columnValues = record.getColumnValues(); // 写入数据 if (BinlogRecordType.INSERT.equals(type) || BinlogRecordType.UPDATE_AFTER.equals(type)) { Operate upsert = dwsClient.write(sinkTableSchema); for (int i = 0; i < sinkColumns.size(); i++) { upsert.setObject(i, columnValues.get(i), false); } upsert.commit(); } else if (BinlogRecordType.DELETE.equals(type) || BinlogRecordType.UPDATE_BEFORE.equals(type)) { Operate delete = dwsClient.delete(sinkTableSchema); for (int i = 0; i < sinkColumns.size(); i++) { String field = sinkColumns.get(i); if (!sinkTableSchema.isPrimaryKey(field)) { continue; } delete.setObject(i, columnValues.get(i), false); } delete.commit(); } } binlogReader.checkException(); } catch (Exception e) { throw new DwsClientException(ExceptionCode.GET_BINLOG_ERROR, "get binlog has error", e); } } } private static DwsConfig buildDwsConfig() { // 初始化一些配置信息(只列举一些必要的配置,具体配置信息请参考文档) TableConfig tableConfig = new TableConfig().withBinlog(true) .withNewSystemValue(true) .withNeedRedistribution(false) .withBinlogSlotName(BINLOG_SLOT_NAME); return DwsConfig.builder() .withUrl("链接信息") .withUsername("用户名") .withPassword("密码") .withBinlogTableName(BINLOG_TABLE_NAME) .withTableConfig(BINLOG_TABLE_NAME, tableConfig) .build(); } } |