做Binlog流表
Binlog功能仅9.1.0及以上集群版本开始支持。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '', 'binlog' = 'true' ); |
参数说明
|
参数 |
说明 |
默认值 |
|---|---|---|
|
connector |
flink框架区分Connector参数,固定为dws。 |
- |
|
url |
数据库连接地址。 |
- |
|
username |
配置连接用户。 |
- |
|
password |
数据库用户密码。 |
- |
|
参数 |
名称 |
类型 |
说明 |
默认值 |
|---|---|---|---|---|
|
connectionSize |
读取线程池大小 |
int |
用于执行操作的线程数量=数据库连接数量,同写入线程大小。 |
1 |
|
binlog |
是否读取Binlog信息 |
boolean |
是否开启Binlog功能。 如果为false,下面的参数均不生效。 |
false |
|
binlogSlotName |
Binlog槽位名称 |
String |
Binlog槽位名称,因为可能存在多个flink任务同时消费同一张表的Binlog信息,所以每个任务需要保证其槽位名称不相同,否则可能发生未知问题。 |
flink表表名 |
|
binlogSyncPointSize |
读取Binlog同步点区间的大小 |
int |
读取Binlog同步点区间的大小,每次从dws数据表获取csn范围在(当前csn, 当前csn + binlogSyncPointSize)内的binlog信息,并在处理完这些数据后进行打点。 如果设置过大,可能会引发查询数据下盘造成性能下降。 |
5000 |
|
binlogBatchReadSize |
增量读取Binlog时一批数据的行数 |
int |
增量同步处理一次数据的数据量。 |
5000 |
|
fullSyncBinlogBatchReadSize |
全量读取Binlog时一批数据的行数 |
int |
全量同步处理一次数据的数据量。 |
50000 |
|
binlogReadTimeout |
增量消费Binlog数据时超时时间(毫秒) |
long |
增量同步Binlog信息的超时时间。 |
600000 |
|
fullSyncBinlogReadTimeout |
全量消费Binlog数据时超时时间(毫秒) |
long |
全量同步Binlog信息的超时时间。 |
1800000 |
|
binlogSleepTime |
实时消费不到Binlog数据时休眠时间(毫秒) |
long |
没有Binlog数据可消费时的睡眠时间,过了这段时间后会重新查看是否有新的Binlog数据。 |
500 |
|
binlogMaxSleepTime |
实时消费不到Binlog数据时最大休眠时间(毫秒) |
long |
没有Binlog数据可消费时的最大睡眠时间。 |
10000 |
|
binlogQueueSize |
Binlog数据队列大小 |
int |
Binlog处理事件队列长度,默认为0(无限长度),在资源紧张时会导致OOM,需要按照资源使用情况进行调整。 |
0 |
|
needRedistribution |
是否兼容扩容重分布 |
boolean |
是否兼容扩容重分布,若为false,当DWS集群处于重分布状态时可能会有未知问题。 |
false |
|
checkNodeChangeInterval |
检测节点变化的时间间隔(毫秒) |
long |
仅在needRedistribution为true时有效,每隔一段时间检查节点的变化情况。 |
10000 |
|
binlogParallelNum |
消费Binlog数据时的线程数 |
int |
设置消费Binlog数据时的线程数,仅在任务并发度小于DWS集群DN数时有效。 |
3 |
|
binlogMaxRetryTimes |
消费Binlog数据出错后的重试次数 |
int |
消费Binlog数据出错后的重试次数。 |
1 |
|
binlogRetryInterval |
消费Binlog数据出错后的重试时间间隔(毫秒) |
long |
消费Binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。 |
100 |
|
binlogStartTime |
设置一个时间点,则会从该时间点已经同步的数据点开始消费Binlog |
String |
设置从某个时间点对应的数据同步点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且对应DWS表需要开启enable_binlog_timestamp参数。 例:2025/05/01 00:00:00时间点对应的binlog_sync_point为50000,那么只同步binlog_sync_point大于50000的数据。 注意:如果作业在停止状态时插入数据,那么在那之后的时间对应的数据同步点会变为空,此时执行同步可能导致该参数失效。 该参数仅9.1.0.200及以上集群版本支持。 |
- |
|
binlogIgnoreUpdateBefore |
是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息 |
bool |
是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上集群版本支持。 |
false |
|
executeDirectOn |
是否开启直连DN模式 |
bool |
是否让SQL直接由DN执行,若为false,则产生的SQL交给CN执行,会产生一些调度上的消耗,影响性能。 |
true |
|
binlogSkipFullSync |
是否跳过全量同步阶段 |
bool |
是否直接跳过全量同步,若为true,则在当前无可用点位时,以当前数据位置作位点开始增量同步。 |
false |
|
binlogCondition |
Binlog表下推条件 |
String |
设置flink表的下推条件,当前仅支持简单的下推条件。 例:当前表中有一个叫count的int字段,现在不想让flink表统计count大于100的数据,则可以按以下写法: 'binlogCondition'='count <= 100' |
- |
示例
- DWS侧:
新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。
-- 源表(产生binlog)
1CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
-- 目标表
1CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
- Flink侧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
-- 建立源表的映射表 CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); -- 建立目标表的映射表 CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); INSERT INTO test_binlog_sink select * from test_binlog_source;
常见问题
- Q:Binlog作业运行过程中出现异常,could not find slot: xxx for rel: xxx, you need to call pgxc_get_binlog_sync_point(),要怎么处理?
A:检查下DWS的Binlog流表的表级参数以及Flink的Checkpoint间隔,重点关注DWS表中的表级参数binlog_ttl,此参数的作用为:如果同步任务注册的同步点超过这个时间没有进行同步,那么就会将该同步点删除。当不显示设置该参数时,binlog_ttl将使用默认值86400, 单位为秒。如果binlog_ttl设置过短可能会导致上述异常的产生。
最佳实践:配置binlog_ttl要大于Flink侧的Checkpoint间隔。即可避免上述问题产生。