做Binlog流表
Binlog功能仅9.1.0及以上集群版本开始支持。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 | create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '', 'binlog' = 'true' ); |
参数说明
| 参数 | 说明 | 默认值 |
|---|---|---|
| connector | flink框架区分Connector参数,固定为dws。 | - |
| url | 数据库连接地址。 | - |
| username | 配置连接用户。 | - |
| password | 数据库用户密码。 | - |
| 参数 | 名称 | 类型 | 说明 | 默认值 |
|---|---|---|---|---|
| connectionSize | 读取线程池大小 | int | 用于执行操作的线程数量=数据库连接数量,同写入线程大小。 | 1 |
| binlog | 是否读取Binlog信息 | boolean | 是否开启Binlog功能。 如果为false,下面的参数均不生效。 | false |
| binlogSlotName | Binlog槽位名称 | String | Binlog槽位名称,因为可能存在多个flink任务同时消费同一张表的Binlog信息,所以每个任务需要保证其槽位名称不相同,否则可能发生未知问题。 | flink表表名 |
| binlogSyncPointSize | 读取Binlog同步点区间的大小 | int | 读取Binlog同步点区间的大小,每次从dws数据表获取csn范围在(当前csn, 当前csn + binlogSyncPointSize)内的binlog信息,并在处理完这些数据后进行打点。 如果设置过大,可能会引发查询数据下盘造成性能下降。 | 5000 |
| binlogBatchReadSize | 增量读取Binlog时一批数据的行数 | int | 增量同步处理一次数据的数据量。 | 5000 |
| fullSyncBinlogBatchReadSize | 全量读取Binlog时一批数据的行数 | int | 全量同步处理一次数据的数据量。 | 50000 |
| binlogReadTimeout | 增量消费Binlog数据时超时时间(毫秒) | long | 增量同步Binlog信息的超时时间。 | 600000 |
| fullSyncBinlogReadTimeout | 全量消费Binlog数据时超时时间(毫秒) | long | 全量同步Binlog信息的超时时间。 | 1800000 |
| binlogSleepTime | 实时消费不到Binlog数据时休眠时间(毫秒) | long | 没有Binlog数据可消费时的睡眠时间,过了这段时间后会重新查看是否有新的Binlog数据。 | 500 |
| binlogMaxSleepTime | 实时消费不到Binlog数据时最大休眠时间(毫秒) | long | 没有Binlog数据可消费时的最大睡眠时间。 | 10000 |
| binlogQueueSize | Binlog数据队列大小 | int | Binlog处理事件队列长度,默认为0(无限长度),在资源紧张时会导致OOM,需要按照资源使用情况进行调整。 | 0 |
| needRedistribution | 是否兼容扩容重分布 | boolean | 是否兼容扩容重分布,若为false,当DWS集群处于重分布状态时可能会有未知问题。 | false |
| checkNodeChangeInterval | 检测节点变化的时间间隔(毫秒) | long | 仅在needRedistribution为true时有效,每隔一段时间检查节点的变化情况。 | 10000 |
| binlogParallelNum | 消费Binlog数据时的线程数 | int | 设置消费Binlog数据时的线程数,仅在任务并发度小于DWS集群DN数时有效。 | 3 |
| binlogMaxRetryTimes | 消费Binlog数据出错后的重试次数 | int | 消费Binlog数据出错后的重试次数。 | 1 |
| binlogRetryInterval | 消费Binlog数据出错后的重试时间间隔(毫秒) | long | 消费Binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。 | 100 |
| binlogStartTime | 设置一个时间点,则会从该时间点已经同步的数据点开始消费Binlog | String | 设置从某个时间点对应的数据同步点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且对应DWS表需要开启enable_binlog_timestamp参数。 例:2025/05/01 00:00:00时间点对应的binlog_sync_point为50000,那么只同步binlog_sync_point大于50000的数据。 注意:如果作业在停止状态时插入数据,那么在那之后的时间对应的数据同步点会变为空,此时执行同步可能导致该参数失效。 该参数仅9.1.0.200及以上集群版本支持。 | - |
| binlogIgnoreUpdateBefore | 是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息 | bool | 是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上集群版本支持。 | false |
| executeDirectOn | 是否开启直连DN模式 | bool | 是否让SQL直接由DN执行,若为false,则产生的SQL交给CN执行,会产生一些调度上的消耗,影响性能。 | true |
| binlogSkipFullSync | 是否跳过全量同步阶段 | bool | 是否直接跳过全量同步,若为true,则在当前无可用点位时,以当前数据位置作位点开始增量同步。 | false |
| binlogCondition | Binlog表下推条件 | String | 设置flink表的下推条件,当前仅支持简单的下推条件。 例:当前表中有一个叫count的int字段,现在不想让flink表统计count大于100的数据,则可以按以下写法: 'binlogCondition'='count <= 100' | - |
示例
- DWS侧:
新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。
-- 源表(产生binlog)
1CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
-- 目标表
1CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
- Flink侧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
-- 建立源表的映射表 CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); -- 建立目标表的映射表 CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); INSERT INTO test_binlog_sink select * from test_binlog_source;
常见问题
- Q:Binlog作业运行过程中出现异常,could not find slot: xxx for rel: xxx, you need to call pgxc_get_binlog_sync_point(),要怎么处理?
A:检查下DWS的Binlog流表的表级参数以及Flink的Checkpoint间隔,重点关注DWS表中的表级参数binlog_ttl,此参数的作用为:如果同步任务注册的同步点超过这个时间没有进行同步,那么就会将该同步点删除。当不显示设置该参数时,binlog_ttl将使用默认值86400, 单位为秒。如果binlog_ttl设置过短可能会导致上述异常的产生。
最佳实践:配置binlog_ttl要大于Flink侧的Checkpoint间隔。即可避免上述问题产生。