Real-Time Binlog Consumption by Flink
Precautions
- Currently, only versions 8.3.0.100 and later support HStore and HStore-opt for recording binlogs. V3 is currently in the trial commercial use phase and needs to be evaluated before being used.
- The Binlog function is only supported for Hstore and HStore-opt tables in GaussDB(DWS). These tables must have primary keys and one of parameters enable_binlog and enable_binlog_timestamp must be set to on.
- The name of the consumed binlog table cannot contain special characters, such as periods (.) and double quotation marks (").
- If multiple tasks consume binlog data of a single table, ensure that binlogSlotName of each task is unique.
- For maximum consumption speed, match task concurrency with the number of DNs in your GaussDB(DWS) cluster.
- If you use the sink capability of dws-connector-flink to write binlog data, pay attention to the following:
- To ensure the data write sequence on DNs, set connectionSize to 1.
- If the primary key is updated on the source end or Flink is required for aggregation calculation, set ignoreUpdateBefore to false. Otherwise, you are not advised to set ignoreUpdateBefore to false (the default value is true).
Real-Time Binlog Consumption by Flink
Use DWS Connector to consume binlogs in real time. For details, see DWS-Connector.
If full data has been synchronized to the target end using other synchronization tools, and only incremental synchronization is required, you can call the following system function to update the synchronization points.
1
|
SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name'); |
Source Table DDL
The source autonomously assigns the appropriate Flink RowKind type (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER) to each data row based on the operation type. This mechanism facilitates the synchronization of table data in a mirrored way, akin to the Change Data Capture (CDC) feature in MySQL and PostgreSQL databases.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx') |
Binlog Parameters
The following table describes the parameters involved in binlog consumption.
Parameter |
Description |
Data Type |
Default Value |
---|---|---|---|
binlog |
Specifies whether to read binlog information. |
Boolean |
false |
binlogSlotName |
Slot, which serves as an identifier. Multiple Flink tasks can simultaneously consume binlog data of the same table, so each task's binlogSlotName must be unique. |
String |
Name of the Flink mapping table |
binlogBatchReadSize |
Rows of binlog data read in batches. |
Integer |
5000 |
fullSyncBinlogBatchReadSize |
Rows of binlog data fully read. |
Integer |
50000 |
binlogReadTimeout |
Timeout for incrementally consuming binlog data, in milliseconds. |
Integer |
600000 |
fullSyncBinlogReadTimeout |
Timeout for fully consuming binlog data, in milliseconds. |
Long |
1800000 |
binlogSleepTime |
Sleep duration when no real-time binlog data is consumed, in milliseconds. The sleep duration with consecutive read failures is binlogSleepTime * failures, up to binlogMaxSleepTime. The value is reset after successful data read. |
Long |
500 |
binlogMaxSleepTime |
Maximum sleep duration when no real-time binlog data is consumed, in milliseconds. |
Long |
10000 |
binlogMaxRetryTimes |
Maximum number of retries after a binlog data consumption error. |
Integer |
1 |
binlogRetryInterval |
Interval between retries after a binlog data consumption error, in milliseconds. Sleep duration during retry, which is calculated as binlogRetryInterval * (1~binlogMaxRetryTimes) + Random(100). The unit is millisecond. |
Long |
100 |
binlogParallelNum |
Number of threads for consuming binlog data. This parameter is valid only when task concurrency is less than the number of DNs in the GaussDB(DWS) cluster. |
Integer |
3 |
connectionPoolSize |
Number of connections in the JDBC connection pool. |
Integer |
5 |
needRedistribution |
Determines compatibility with expansion redistribution. To ensure compatibility, upgrade the kernel to the corresponding version. If the kernel is an older version, set this parameter to false. If set to true, restart-strategy of Flink cannot be set to none. |
Boolean |
true |
newSystemValue |
Indicates whether to use the new system field when reading binlog data. (The kernel needs to be upgraded to the corresponding version. If the kernel is an older version, set this parameter to false.) |
Boolean |
true |
checkNodeChangeInterval |
Interval for detecting node changes. This parameter is valid only when needRedistribution is set to true. |
Long |
10000 |
connectionSocketTimeout |
Timeout interval for connection processing, in milliseconds. It can also be considered as the timeout interval for executing SQL statements on the client. The default value is 0, which means that the timeout interval is not set. |
Integer |
0 |
binlogIgnoreUpdateBefore |
Determines whether to filter out before_update records in binlogs and whether to return only primary key information for delete records. This parameter is supported only in 9.1.0.200 and later versions. |
Boolean |
false |
binlogStartTime |
Sets the time point from which binlogs are consumed can be set using the format yyyy-MM-dd hh:mm:ss. enable_binlog_timestamp must be enabled for the table. This parameter is supported only in 9.1.0.200 and later versions. |
String |
N/A |
binlogSyncPointSize |
Specifies the size of the synchronization point range for incrementally reading binlogs. This can control data flushing if the data volume is too large. This parameter is supported only in 9.1.0.200 and later versions. |
Integer |
5000 |
Data Synchronization Example
- On GaussDB(DWS):
When creating a binlog table, set enable_hstore_binlog_table to true. You can run the show enable_hstore_binlog_table command to query the binlog table.
-- Source table (generating binlogs)
1
CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
-- Target table
1
CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
- On Flink:
Run the following commands to perform complete data synchronization:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
-- Create a mapping table for the source table. CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); -- Create a mapping table for the target table: CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); INSERT INTO test_binlog_sink select * from test_binlog_source;
Example of Using Java Programs
Create a source table and a target table.
1 2 3 4 |
-- source create table binlog_test_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true); -- sink create table binlog_test_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true); |
Demo program:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
public class BinlogDemo { //Name of the binlog table private static final String BINLOG_TABLE_NAME = "binlog_test_source"; //Slot name of the binlog table private static final String BINLOG_SLOT_NAME = "binlog_test_slot"; //Name of the table to be written private static final String SINK_TABLE_NAME = "binlog_test_sink"; public static void main(String[] args) throws Exception { DwsConfig dwsConfig = buildDwsConfig(); DwsClient dwsClient = new DwsClient(dwsConfig); TableSchema sourceTableSchema = dwsClient.getTableSchema(TableName.valueOf(BINLOG_TABLE_NAME)); TableSchema sinkTableSchema = dwsClient.getTableSchema(TableName.valueOf(SINK_TABLE_NAME)); // Columns to be written List<String> sinkColumns = sinkTableSchema.getColumnNames(); // Thread pool DwsConnectionPool dwsConnectionPool = new DwsConnectionPool(dwsConfig); //Queue for storing data BlockingQueue<BinlogRecord> queue = new LinkedBlockingQueue<>(); //Columns to be synchronized List<String> sourceColumnNames = sourceTableSchema.getColumnNames(); BinlogReader binlogReader = new BinlogReader(dwsConfig, queue, sourceColumnNames, dwsConnectionPool); //Start the read task. binlogReader.start(); binlogReader.getRecords(); while (binlogReader.isStart()) { try { while (!queue.isEmpty() && !binlogReader.hasException()) { // Read data. BinlogRecord record = queue.poll(); if (Objects.isNull(record)) { continue; } BinlogRecordType type = BinlogRecordType.toBinlogRecordType(record.getType()); List<Object> columnValues = record.getColumnValues(); // Write data. if (BinlogRecordType.INSERT.equals(type) || BinlogRecordType.UPDATE_AFTER.equals(type)) { Operate upsert = dwsClient.write(sinkTableSchema); for (int i = 0; i < sinkColumns.size(); i++) { upsert.setObject(i, columnValues.get(i), false); } upsert.commit(); } else if (BinlogRecordType.DELETE.equals(type) || BinlogRecordType.UPDATE_BEFORE.equals(type)) { Operate delete = dwsClient.delete(sinkTableSchema); for (int i = 0; i < sinkColumns.size(); i++) { String field = sinkColumns.get(i); if (!sinkTableSchema.isPrimaryKey(field)) { continue; } delete.setObject(i, columnValues.get(i), false); } delete.commit(); } } binlogReader.checkException(); } catch (Exception e) { throw new DwsClientException(ExceptionCode.GET_BINLOG_ERROR, "get binlog has error", e); } } } private static DwsConfig buildDwsConfig() { //Initialize configuration information. (Only necessary parameters are listed. For more information about the configuration, see the document.) TableConfig tableConfig = new TableConfig().withBinlog(true) .withNewSystemValue(true) .withNeedRedistribution(false) .withBinlogSlotName(BINLOG_SLOT_NAME); return DwsConfig.builder() .withUrl("Link information") .withUsername("Username") .withPassword ("Password") .withBinlogTableName(BINLOG_TABLE_NAME) .withTableConfig(BINLOG_TABLE_NAME, tableConfig) .build(); } } |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot