Real-Time Binlog Consumption by Flink
Precautions
- Binlog is currently in beta and is only available for commercial use on a restricted basis. To use Binlog, contact technical support.
- Binlog recording for HStore and HStore-op operations is only supported in versions 8.3.0.100 and later.
- Binlog functionality is exclusive to HStore tables in GaussDB(DWS), requiring primary keys and the enable_binlog setting enabled.
- The name of the consumed Binlog table cannot contain special characters, such as periods (.) and double quotation marks (").
- If multiple tasks consume the Binlog data of the same table, ensure that binlogSlotName of each task is unique.
- For maximum consumption speed, match task concurrency with the number of Data Nodes (DNs) in your data warehouse cluster.
- When using dws-connector-flink for Binlog data writing, set connectionSize to 1 to maintain the sequence of data writes across DNs. When updating primary keys or performing aggregation calculations, set ignoreUpdateBefore to false. (In other scenarios, this setting is not recommended.)
Real-Time Binlog Consumption by Flink
Use the DWS Connector to consume Binlogs in real time. For details, see DWS-Connector.
If full data has been synchronized to the target end using other synchronization tools, and only incremental synchronization is required, you can call the following system function to update the synchronization points.
SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name');
Source Table DDL
The source autonomously assigns the appropriate Flink RowKind type (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER) to each data row based on the operation type. This mechanism facilitates the synchronization of table data in a mirrored way, akin to the Change Data Capture (CDC) feature in MySQL and PostgreSQL databases.
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE TABLE test_binlog_source ( a int, b int, c int ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx') |
Binlog Parameters
The following table describes the parameters involved in Binlog consumption.
Parameter |
Description |
Data Type |
Default Value |
---|---|---|---|
binlog |
Enables reading of Binlog information. |
Boolean |
false |
binlogSlotName |
Slot, which can be understood as an identifier. Multiple Flink tasks can simultaneously consume Binlog data of the same table, so each task's binlogSlotName must be unique. |
String |
Name of the Flink mapping table |
binlogBatchReadSize |
Number of data rows read from binlogs in batches. |
Integer |
5000 |
fullSyncBinlogBatchReadSize |
Number of data rows that are fully read from binlogs. |
Integer |
50000 |
binlogReadTimeout |
Timeout for incrementally consuming Binlog data (milliseconds). |
Integer |
600000 |
fullSyncBinlogReadTimeout |
Timeout for fully consuming Binlog data (milliseconds). |
Long |
1800000 |
binlogSleepTime |
Sleep duration when no real-time Binlog data is consumed (milliseconds). The sleep duration with consecutive read failures is binlogSleepTime * failures, up to binlogMaxSleepTime. The value is reset after successful data read. |
Long |
500 |
binlogMaxSleepTime |
Maximum sleep duration when no real-time Binlog data is consumed (milliseconds). |
Long |
10000 |
binlogMaxRetryTimes |
Maximum number of retries after a Binlog data consumption error. |
Integer |
1 |
binlogRetryInterval |
Interval between retries after a Binlog data consumption error (milliseconds). Sleep duration during retry is calculated as binlogRetryInterval * (1~binlogMaxRetryTimes) + Random(100). |
Long |
100 |
binlogParallelNum |
Number of threads for consuming Binlog data. Applicable only when task concurrency is less than the number of DNs in the data warehouse cluster. |
Integer |
3 |
connectionPoolSize |
Number of connections in the JDBC connection pool. |
Integer |
5 |
Data Synchronization Example
- On GaussDB(DWS):
When creating a binlog table, set enable_hstore_binlog_table to true. You can run the show enable_hstore_binlog_table command to query the binlog table.
-- Source table (generates Binlogs)
1
CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on, enable_binlog=true);
-- Target table
1
CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on);
- On Flink:
Run the following command to perform complete data synchronization:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
CREATE TABLE test_binlog_source ( a int, b int, c int ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); CREATE TABLE test_binlog_sink ( a int, b int, c int) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); INSERT INTO test_binlog_sink select * from test_binlog_source;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot