Updated on 2025-01-07 GMT+08:00

Real-Time Binlog Consumption by Flink

Precautions

  • Currently, only versions 8.3.0.100 and later support HStore and HStore-opt for recording binlogs. V3 is currently in the trial commercial use phase and needs to be evaluated before being used.
  • The Binlog function is only supported for Hstore and HStore-opt tables in GaussDB(DWS). These tables must have primary keys and one of parameters enable_binlog and enable_binlog_timestamp must be set to on.
  • The name of the consumed binlog table cannot contain special characters, such as periods (.) and double quotation marks (").
  • If multiple tasks consume binlog data of a single table, ensure that binlogSlotName of each task is unique.
  • For maximum consumption speed, match task concurrency with the number of DNs in your GaussDB(DWS) cluster.
  • If you use the sink capability of dws-connector-flink to write binlog data, pay attention to the following:
    • To ensure the data write sequence on DNs, set connectionSize to 1.
    • If the primary key is updated on the source end or Flink is required for aggregation calculation, set ignoreUpdateBefore to false. Otherwise, you are not advised to set ignoreUpdateBefore to false (the default value is true).

Real-Time Binlog Consumption by Flink

Use DWS Connector to consume binlogs in real time. For details, see DWS-Connector.

If full data has been synchronized to the target end using other synchronization tools, and only incremental synchronization is required, you can call the following system function to update the synchronization points.

1
SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name');

Source Table DDL

The source autonomously assigns the appropriate Flink RowKind type (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER) to each data row based on the operation type. This mechanism facilitates the synchronization of table data in a mirrored way, akin to the Change Data Capture (CDC) feature in MySQL and PostgreSQL databases.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE TABLE test_binlog_source ( 
   a int,
   b int,
   c int,
   primary key(a) NOT ENFORCED
) with (
   'connector' = 'dws',
   'url' = 'jdbc:gaussdb://ip:port/gaussdb',
   'binlog' = 'true',
   'tableName' = 'test_binlog_source',   
   'binlogSlotName' = 'slot',   
   'username'='xxx',   
   'password'='xxx')

Binlog Parameters

The following table describes the parameters involved in binlog consumption.

Table 1 Parameters

Parameter

Description

Data Type

Default Value

binlog

Specifies whether to read binlog information.

Boolean

false

binlogSlotName

Slot, which serves as an identifier. Multiple Flink tasks can simultaneously consume binlog data of the same table, so each task's binlogSlotName must be unique.

String

Name of the Flink mapping table

binlogBatchReadSize

Rows of binlog data read in batches.

Integer

5000

fullSyncBinlogBatchReadSize

Rows of binlog data fully read.

Integer

50000

binlogReadTimeout

Timeout for incrementally consuming binlog data, in milliseconds.

Integer

600000

fullSyncBinlogReadTimeout

Timeout for fully consuming binlog data, in milliseconds.

Long

1800000

binlogSleepTime

Sleep duration when no real-time binlog data is consumed, in milliseconds. The sleep duration with consecutive read failures is binlogSleepTime * failures, up to binlogMaxSleepTime. The value is reset after successful data read.

Long

500

binlogMaxSleepTime

Maximum sleep duration when no real-time binlog data is consumed, in milliseconds.

Long

10000

binlogMaxRetryTimes

Maximum number of retries after a binlog data consumption error.

Integer

1

binlogRetryInterval

Interval between retries after a binlog data consumption error, in milliseconds. Sleep duration during retry, which is calculated as binlogRetryInterval * (1~binlogMaxRetryTimes) + Random(100). The unit is millisecond.

Long

100

binlogParallelNum

Number of threads for consuming binlog data. This parameter is valid only when task concurrency is less than the number of DNs in the GaussDB(DWS) cluster.

Integer

3

connectionPoolSize

Number of connections in the JDBC connection pool.

Integer

5

needRedistribution

Determines compatibility with expansion redistribution. To ensure compatibility, upgrade the kernel to the corresponding version. If the kernel is an older version, set this parameter to false. If set to true, restart-strategy of Flink cannot be set to none.

Boolean

true

newSystemValue

Indicates whether to use the new system field when reading binlog data. (The kernel needs to be upgraded to the corresponding version. If the kernel is an older version, set this parameter to false.)

Boolean

true

checkNodeChangeInterval

Interval for detecting node changes. This parameter is valid only when needRedistribution is set to true.

Long

10000

connectionSocketTimeout

Timeout interval for connection processing, in milliseconds. It can also be considered as the timeout interval for executing SQL statements on the client. The default value is 0, which means that the timeout interval is not set.

Integer

0

binlogIgnoreUpdateBefore

Determines whether to filter out before_update records in binlogs and whether to return only primary key information for delete records. This parameter is supported only in 9.1.0.200 and later versions.

Boolean

false

binlogStartTime

Sets the time point from which binlogs are consumed can be set using the format yyyy-MM-dd hh:mm:ss. enable_binlog_timestamp must be enabled for the table.

This parameter is supported only in 9.1.0.200 and later versions.

String

N/A

binlogSyncPointSize

Specifies the size of the synchronization point range for incrementally reading binlogs. This can control data flushing if the data volume is too large.

This parameter is supported only in 9.1.0.200 and later versions.

Integer

5000

Data Synchronization Example

  • On GaussDB(DWS):

    When creating a binlog table, set enable_hstore_binlog_table to true. You can run the show enable_hstore_binlog_table command to query the binlog table.

    -- Source table (generating binlogs)

    1
    CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
    

    -- Target table

    1
    CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
    
  • On Flink:

    Run the following commands to perform complete data synchronization:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    -- Create a mapping table for the source table.
    CREATE TABLE test_binlog_source ( 
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'binlog' = 'true',
       'tableName' = 'test_binlog_source',   
       'binlogSlotName' = 'slot',   
       'username'='xxx',   
       'password'='xxx');
    
    -- Create a mapping table for the target table:
    CREATE TABLE test_binlog_sink (  
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'tableName' = 'test_binlog_sink',   
       'ignoreUpdateBefore'='false',   
       'connectionSize' = '1',
       'username'='xxx',
       'password'='xxx');
    
    INSERT INTO test_binlog_sink select * from test_binlog_source;
    

Example of Using Java Programs

Create a source table and a target table.

1
2
3
4
-- source
create table binlog_test_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
-- sink
create table binlog_test_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);

Demo program:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class BinlogDemo {

    //Name of the binlog table
    private static final String BINLOG_TABLE_NAME = "binlog_test_source";

    //Slot name of the binlog table
    private static final String BINLOG_SLOT_NAME = "binlog_test_slot";

    //Name of the table to be written
    private static final String SINK_TABLE_NAME = "binlog_test_sink";

    public static void main(String[] args) throws Exception {
        DwsConfig dwsConfig = buildDwsConfig();
        DwsClient dwsClient = new DwsClient(dwsConfig);

        TableSchema sourceTableSchema = dwsClient.getTableSchema(TableName.valueOf(BINLOG_TABLE_NAME));
        TableSchema sinkTableSchema = dwsClient.getTableSchema(TableName.valueOf(SINK_TABLE_NAME));

        // Columns to be written
        List<String> sinkColumns = sinkTableSchema.getColumnNames();

        // Thread pool
        DwsConnectionPool dwsConnectionPool = new DwsConnectionPool(dwsConfig);
        //Queue for storing data
        BlockingQueue<BinlogRecord> queue = new LinkedBlockingQueue<>();
        //Columns to be synchronized
        List<String> sourceColumnNames = sourceTableSchema.getColumnNames();

        BinlogReader binlogReader = new BinlogReader(dwsConfig, queue, sourceColumnNames, dwsConnectionPool);

        //Start the read task.
        binlogReader.start();
        binlogReader.getRecords();

        while (binlogReader.isStart()) {
            try {
                while (!queue.isEmpty() && !binlogReader.hasException()) {
                    // Read data.
                    BinlogRecord record = queue.poll();
                    if (Objects.isNull(record)) {
                        continue;
                    }
                    BinlogRecordType type = BinlogRecordType.toBinlogRecordType(record.getType());
                    List<Object> columnValues = record.getColumnValues();

                    // Write data.
                    if (BinlogRecordType.INSERT.equals(type) || BinlogRecordType.UPDATE_AFTER.equals(type)) {
                        Operate upsert = dwsClient.write(sinkTableSchema);
                        for (int i = 0; i < sinkColumns.size(); i++) {
                            upsert.setObject(i, columnValues.get(i), false);
                        }
                        upsert.commit();
                    } else if (BinlogRecordType.DELETE.equals(type) || BinlogRecordType.UPDATE_BEFORE.equals(type)) {
                        Operate delete = dwsClient.delete(sinkTableSchema);
                        for (int i = 0; i < sinkColumns.size(); i++) {
                            String field = sinkColumns.get(i);
                            if (!sinkTableSchema.isPrimaryKey(field)) {
                                continue;
                            }
                            delete.setObject(i, columnValues.get(i), false);
                        }
                        delete.commit();
                    }
                }
                binlogReader.checkException();
            } catch (Exception e) {
                throw new DwsClientException(ExceptionCode.GET_BINLOG_ERROR, "get binlog  has error", e);
            }
        }
    }

    private static DwsConfig buildDwsConfig() {
        //Initialize configuration information. (Only necessary parameters are listed. For more information about the configuration, see the document.)
        TableConfig tableConfig = new TableConfig().withBinlog(true)
                .withNewSystemValue(true)
                .withNeedRedistribution(false)
                .withBinlogSlotName(BINLOG_SLOT_NAME);
        return DwsConfig.builder()
                .withUrl("Link information")
                .withUsername("Username")
                .withPassword ("Password")
                .withBinlogTableName(BINLOG_TABLE_NAME)
                .withTableConfig(BINLOG_TABLE_NAME, tableConfig)
                .build();
    }
}