Esta página ainda não está disponível no idioma selecionado. Estamos trabalhando para adicionar mais opções de idiomas. Agradecemos sua compreensão.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Real-Time Binlog Consumption by Flink

Updated on 2025-01-07 GMT+08:00

Precautions

  • Currently, only versions 8.3.0.100 and later support HStore and HStore-opt for recording binlogs. V3 is currently in the trial commercial use phase and needs to be evaluated before being used.
  • The Binlog function is only supported for Hstore and HStore-opt tables in GaussDB(DWS). These tables must have primary keys and one of parameters enable_binlog and enable_binlog_timestamp must be set to on.
  • The name of the consumed binlog table cannot contain special characters, such as periods (.) and double quotation marks (").
  • If multiple tasks consume binlog data of a single table, ensure that binlogSlotName of each task is unique.
  • For maximum consumption speed, match task concurrency with the number of DNs in your GaussDB(DWS) cluster.
  • If you use the sink capability of dws-connector-flink to write binlog data, pay attention to the following:
    • To ensure the data write sequence on DNs, set connectionSize to 1.
    • If the primary key is updated on the source end or Flink is required for aggregation calculation, set ignoreUpdateBefore to false. Otherwise, you are not advised to set ignoreUpdateBefore to false (the default value is true).

Real-Time Binlog Consumption by Flink

Use DWS Connector to consume binlogs in real time. For details, see DWS-Connector.

If full data has been synchronized to the target end using other synchronization tools, and only incremental synchronization is required, you can call the following system function to update the synchronization points.

1
SELECT * FROM pg_catalog.pgxc_register_full_sync_point('table_name', 'slot_name');

Source Table DDL

The source autonomously assigns the appropriate Flink RowKind type (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER) to each data row based on the operation type. This mechanism facilitates the synchronization of table data in a mirrored way, akin to the Change Data Capture (CDC) feature in MySQL and PostgreSQL databases.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE TABLE test_binlog_source ( 
   a int,
   b int,
   c int,
   primary key(a) NOT ENFORCED
) with (
   'connector' = 'dws',
   'url' = 'jdbc:gaussdb://ip:port/gaussdb',
   'binlog' = 'true',
   'tableName' = 'test_binlog_source',   
   'binlogSlotName' = 'slot',   
   'username'='xxx',   
   'password'='xxx')

Binlog Parameters

The following table describes the parameters involved in binlog consumption.

Table 1 Parameters

Parameter

Description

Data Type

Default Value

binlog

Specifies whether to read binlog information.

Boolean

false

binlogSlotName

Slot, which serves as an identifier. Multiple Flink tasks can simultaneously consume binlog data of the same table, so each task's binlogSlotName must be unique.

String

Name of the Flink mapping table

binlogBatchReadSize

Rows of binlog data read in batches.

Integer

5000

fullSyncBinlogBatchReadSize

Rows of binlog data fully read.

Integer

50000

binlogReadTimeout

Timeout for incrementally consuming binlog data, in milliseconds.

Integer

600000

fullSyncBinlogReadTimeout

Timeout for fully consuming binlog data, in milliseconds.

Long

1800000

binlogSleepTime

Sleep duration when no real-time binlog data is consumed, in milliseconds. The sleep duration with consecutive read failures is binlogSleepTime * failures, up to binlogMaxSleepTime. The value is reset after successful data read.

Long

500

binlogMaxSleepTime

Maximum sleep duration when no real-time binlog data is consumed, in milliseconds.

Long

10000

binlogMaxRetryTimes

Maximum number of retries after a binlog data consumption error.

Integer

1

binlogRetryInterval

Interval between retries after a binlog data consumption error, in milliseconds. Sleep duration during retry, which is calculated as binlogRetryInterval * (1~binlogMaxRetryTimes) + Random(100). The unit is millisecond.

Long

100

binlogParallelNum

Number of threads for consuming binlog data. This parameter is valid only when task concurrency is less than the number of DNs in the GaussDB(DWS) cluster.

Integer

3

connectionPoolSize

Number of connections in the JDBC connection pool.

Integer

5

needRedistribution

Determines compatibility with expansion redistribution. To ensure compatibility, upgrade the kernel to the corresponding version. If the kernel is an older version, set this parameter to false. If set to true, restart-strategy of Flink cannot be set to none.

Boolean

true

newSystemValue

Indicates whether to use the new system field when reading binlog data. (The kernel needs to be upgraded to the corresponding version. If the kernel is an older version, set this parameter to false.)

Boolean

true

checkNodeChangeInterval

Interval for detecting node changes. This parameter is valid only when needRedistribution is set to true.

Long

10000

connectionSocketTimeout

Timeout interval for connection processing, in milliseconds. It can also be considered as the timeout interval for executing SQL statements on the client. The default value is 0, which means that the timeout interval is not set.

Integer

0

binlogIgnoreUpdateBefore

Determines whether to filter out before_update records in binlogs and whether to return only primary key information for delete records. This parameter is supported only in 9.1.0.200 and later versions.

Boolean

false

binlogStartTime

Sets the time point from which binlogs are consumed can be set using the format yyyy-MM-dd hh:mm:ss. enable_binlog_timestamp must be enabled for the table.

This parameter is supported only in 9.1.0.200 and later versions.

String

N/A

binlogSyncPointSize

Specifies the size of the synchronization point range for incrementally reading binlogs. This can control data flushing if the data volume is too large.

This parameter is supported only in 9.1.0.200 and later versions.

Integer

5000

Data Synchronization Example

  • On GaussDB(DWS):
    NOTE:

    When creating a binlog table, set enable_hstore_binlog_table to true. You can run the show enable_hstore_binlog_table command to query the binlog table.

    -- Source table (generating binlogs)

    1
    CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
    

    -- Target table

    1
    CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
    
  • On Flink:

    Run the following commands to perform complete data synchronization:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    -- Create a mapping table for the source table.
    CREATE TABLE test_binlog_source ( 
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'binlog' = 'true',
       'tableName' = 'test_binlog_source',   
       'binlogSlotName' = 'slot',   
       'username'='xxx',   
       'password'='xxx');
    
    -- Create a mapping table for the target table:
    CREATE TABLE test_binlog_sink (  
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'tableName' = 'test_binlog_sink',   
       'ignoreUpdateBefore'='false',   
       'connectionSize' = '1',
       'username'='xxx',
       'password'='xxx');
    
    INSERT INTO test_binlog_sink select * from test_binlog_source;
    

Example of Using Java Programs

Create a source table and a target table.

1
2
3
4
-- source
create table binlog_test_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
-- sink
create table binlog_test_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);

Demo program:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class BinlogDemo {

    //Name of the binlog table
    private static final String BINLOG_TABLE_NAME = "binlog_test_source";

    //Slot name of the binlog table
    private static final String BINLOG_SLOT_NAME = "binlog_test_slot";

    //Name of the table to be written
    private static final String SINK_TABLE_NAME = "binlog_test_sink";

    public static void main(String[] args) throws Exception {
        DwsConfig dwsConfig = buildDwsConfig();
        DwsClient dwsClient = new DwsClient(dwsConfig);

        TableSchema sourceTableSchema = dwsClient.getTableSchema(TableName.valueOf(BINLOG_TABLE_NAME));
        TableSchema sinkTableSchema = dwsClient.getTableSchema(TableName.valueOf(SINK_TABLE_NAME));

        // Columns to be written
        List<String> sinkColumns = sinkTableSchema.getColumnNames();

        // Thread pool
        DwsConnectionPool dwsConnectionPool = new DwsConnectionPool(dwsConfig);
        //Queue for storing data
        BlockingQueue<BinlogRecord> queue = new LinkedBlockingQueue<>();
        //Columns to be synchronized
        List<String> sourceColumnNames = sourceTableSchema.getColumnNames();

        BinlogReader binlogReader = new BinlogReader(dwsConfig, queue, sourceColumnNames, dwsConnectionPool);

        //Start the read task.
        binlogReader.start();
        binlogReader.getRecords();

        while (binlogReader.isStart()) {
            try {
                while (!queue.isEmpty() && !binlogReader.hasException()) {
                    // Read data.
                    BinlogRecord record = queue.poll();
                    if (Objects.isNull(record)) {
                        continue;
                    }
                    BinlogRecordType type = BinlogRecordType.toBinlogRecordType(record.getType());
                    List<Object> columnValues = record.getColumnValues();

                    // Write data.
                    if (BinlogRecordType.INSERT.equals(type) || BinlogRecordType.UPDATE_AFTER.equals(type)) {
                        Operate upsert = dwsClient.write(sinkTableSchema);
                        for (int i = 0; i < sinkColumns.size(); i++) {
                            upsert.setObject(i, columnValues.get(i), false);
                        }
                        upsert.commit();
                    } else if (BinlogRecordType.DELETE.equals(type) || BinlogRecordType.UPDATE_BEFORE.equals(type)) {
                        Operate delete = dwsClient.delete(sinkTableSchema);
                        for (int i = 0; i < sinkColumns.size(); i++) {
                            String field = sinkColumns.get(i);
                            if (!sinkTableSchema.isPrimaryKey(field)) {
                                continue;
                            }
                            delete.setObject(i, columnValues.get(i), false);
                        }
                        delete.commit();
                    }
                }
                binlogReader.checkException();
            } catch (Exception e) {
                throw new DwsClientException(ExceptionCode.GET_BINLOG_ERROR, "get binlog  has error", e);
            }
        }
    }

    private static DwsConfig buildDwsConfig() {
        //Initialize configuration information. (Only necessary parameters are listed. For more information about the configuration, see the document.)
        TableConfig tableConfig = new TableConfig().withBinlog(true)
                .withNewSystemValue(true)
                .withNeedRedistribution(false)
                .withBinlogSlotName(BINLOG_SLOT_NAME);
        return DwsConfig.builder()
                .withUrl("Link information")
                .withUsername("Username")
                .withPassword ("Password")
                .withBinlogTableName(BINLOG_TABLE_NAME)
                .withTableConfig(BINLOG_TABLE_NAME, tableConfig)
                .build();
    }
}

Usamos cookies para aprimorar nosso site e sua experiência. Ao continuar a navegar em nosso site, você aceita nossa política de cookies. Saiba mais

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback