Updated on 2024-11-05 GMT+08:00

Real-Time Binlog Consumption by Flink

Precautions

  • Binlog is currently in beta and is only available for commercial use on a restricted basis. To use Binlog, contact technical support.
  • Binlog recording for HStore and HStore-op operations is only supported in versions 8.3.0.100 and later.
  • Binlog functionality is exclusive to HStore tables in GaussDB(DWS), requiring primary keys and the enable_binlog setting enabled.
  • The name of the consumed Binlog table cannot contain special characters, such as periods (.) and double quotation marks (").
  • If multiple tasks consume the Binlog data of the same table, ensure that binlogSlotName of each task is unique.
  • For maximum consumption speed, match task concurrency with the number of Data Nodes (DNs) in your data warehouse cluster.
  • When using dws-connector-flink for Binlog data writing, set connectionSize to 1 to maintain the sequence of data writes across DNs. When updating primary keys or performing aggregation calculations, set ignoreUpdateBefore to false. (In other scenarios, this setting is not recommended.)

Real-Time Binlog Consumption by Flink

Use the DWS Connector to consume Binlogs in real time. For details, see DWS-Connector.

If full data has been synchronized to the target end using other synchronization tools, and only incremental synchronization is required, you can call the following system function to update the synchronization points.

SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name');

Source Table DDL

The source autonomously assigns the appropriate Flink RowKind type (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER) to each data row based on the operation type. This mechanism facilitates the synchronization of table data in a mirrored way, akin to the Change Data Capture (CDC) feature in MySQL and PostgreSQL databases.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE TABLE test_binlog_source ( 
   a int,
   b int,
   c int
) with (
   'connector' = 'dws',
   'url' = 'jdbc:gaussdb://ip:port/gaussdb',
   'binlog' = 'true',
   'tableName' = 'test_binlog_source',   
   'binlogSlotName' = 'slot',   
   'username'='xxx',   
   'password'='xxx')

Binlog Parameters

The following table describes the parameters involved in Binlog consumption.

Table 1 Parameters for consuming Binlogs

Parameter

Description

Data Type

Default Value

binlog

Enables reading of Binlog information.

Boolean

false

binlogSlotName

Slot, which can be understood as an identifier. Multiple Flink tasks can simultaneously consume Binlog data of the same table, so each task's binlogSlotName must be unique.

String

Name of the Flink mapping table

binlogBatchReadSize

Number of data rows read from binlogs in batches.

Integer

5000

fullSyncBinlogBatchReadSize

Number of data rows that are fully read from binlogs.

Integer

50000

binlogReadTimeout

Timeout for incrementally consuming Binlog data (milliseconds).

Integer

600000

fullSyncBinlogReadTimeout

Timeout for fully consuming Binlog data (milliseconds).

Long

1800000

binlogSleepTime

Sleep duration when no real-time Binlog data is consumed (milliseconds). The sleep duration with consecutive read failures is binlogSleepTime * failures, up to binlogMaxSleepTime. The value is reset after successful data read.

Long

500

binlogMaxSleepTime

Maximum sleep duration when no real-time Binlog data is consumed (milliseconds).

Long

10000

binlogMaxRetryTimes

Maximum number of retries after a Binlog data consumption error.

Integer

1

binlogRetryInterval

Interval between retries after a Binlog data consumption error (milliseconds). Sleep duration during retry is calculated as binlogRetryInterval * (1~binlogMaxRetryTimes) + Random(100).

Long

100

binlogParallelNum

Number of threads for consuming Binlog data. Applicable only when task concurrency is less than the number of DNs in the data warehouse cluster.

Integer

3

connectionPoolSize

Number of connections in the JDBC connection pool.

Integer

5

Data Synchronization Example

  • On GaussDB(DWS):

    When creating a binlog table, set enable_hstore_binlog_table to true. You can run the show enable_hstore_binlog_table command to query the binlog table.

    -- Source table (generates Binlogs)

    1
    CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on, enable_binlog=true);
    

    -- Target table

    1
    CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on);
    
  • On Flink:

    Run the following command to perform complete data synchronization:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    CREATE TABLE test_binlog_source ( 
       a int,
       b int,
       c int
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'binlog' = 'true',
       'tableName' = 'test_binlog_source',   
       'binlogSlotName' = 'slot',   
       'username'='xxx',   
       'password'='xxx');
       
    
    CREATE TABLE test_binlog_sink (  
       a int,
       b int,
       c int) 
    WITH (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'tableName' = 'test_binlog_sink',   
       'ignoreUpdateBefore'='false',   
       'connectionSize' = '1',
       'username'='xxx',
       'password'='xxx');
    
    INSERT INTO test_binlog_sink select * from test_binlog_source;