Synchronizing Data to RDS and DWS

Overview

CS can synchronize data to Relational Database Service (RDS) and Data Warehouse Service (DWS) in real time based on the binlog format. Currently, PostgreSQL and MySQL databases are supported. The PostgreSQL database can store data of more complex types and delivers space information services, multi-version concurrent control (MVCC), and high concurrency. It applies to location applications, financial insurance, and e-commerce. The MySQL database reduces IT deployment and maintenance costs in various scenarios, such as web applications, e-commerce, enterprise applications, and mobile applications.

RDS is a cloud-based web service. RDS includes databases of the following types: MySQL, HWSQL, PostgreSQL, and Microsoft SQL Server.

For more information about RDS, see the Relational Database Service User Guide.

DWS is a high-performance, fully hosted, and large-scale parallel processing database service. It supports row- and column-based storage and provides features such as one-click deployment and rapid data import to the database. It aims to meet the requirements of cloud users for secure, reliable, and fast data mining, storage, and analysis.

For more information about DWS, see the Data Warehouse Service Management Guide.

Prerequisites

  • Ensure that you have created a PostgreSQL or MySQL RDS instance in RDS.

    For details about how to create an RDS instance, see Buying an Instance in the Relational Database Service Getting Started Guide.

  • In this scenario, jobs must run on the exclusive cluster of CS. Therefore, CS must interconnect with the VPC that has been connected with RDS instance. You can also set the security group rules as required.

    For details about how to set up the VPC peering connection, see VPC Peering Connection in the Cloud Stream Service User Guide.

    For details about how to configure security group rules, see Security Group in the Virtual Private Cloud User Guide.

Syntax

Syntax

CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )

WITH (

type = "db_sync",

region = "",

db_url = "",

username = "",

password = "",

table_name = "${attr_name}",

operation_field = "${attr_name}",

before = "${attr_name}",

after = "${attr_name}",

tranx_id = "${attr_name}",

commit = "${attr_name}",

sql = "${attr_name}",

table_name_map = "${attr_name}",

column_name_map ="${attr_name}",

schema_case_sensitive = "false",

db_type = "dws",

);

Description

This feature supports database synchronization for MySQL, PostgreSQL, and DWS. Multiple tables in the same database can be synchronized.

Table 1 Parameter description

Parameter

Mandatory

Description

db_url

Yes

Indicates the database connection address, for example, {database_type}://ip:port/database.

Currently, the following database connections are supported:

  • MySQL: 'mysql://ip:port/database'
  • PostgreSQL and DWS: 'postgresql://ip:port/database'

username

Yes

Indicates the username of a database.

password

Yes

Indicates the password of a database.

table_name

Yes

Indicates the table name, that is the table name field in the sink stream. The format is ${attribute name}. This attribute is parsed from the table field in the binlog.

operation_field

Yes

Indicates the operation type, that is the operation type field in the sink stream. The format is ${attribute name}. The operation type can be I, U, D, INSERT, UPDATE, or DELETE. This attribute is parsed from the operation type field in the binlog.

before

Yes

Indicates the original content field in the sink stream. The format is ${attribute name}.

after

Yes

Indicates the new content field in the sink stream. The format is ${attribute name}.

tranx_id

No

Indicates the event ID field in the sink stream. The format is ${attribute name}.

commit

No

Indicates the event completed field in the sink stream. The format is ${attribute name}.

sql

No

Indicates the field of DDL SQL statements in the sink stream. The format is ${attribute name}.

table_name_map

No

If the table name in the original database is different from that in the destination database, you need to set this parameter to a constant or variable.

  • Constant configuration: table_name_map="distdbtable", indicating that all records are exported to the distdbtable table in the database.
  • Variable configuration: table_name_map="${attribute name}". table_name_map="${dbTableName} indicates that the destination table name is determined by the dbTableName attribute.

If this parameter is not set, the destination table is the same as the original table by default.

column_name_map

No

If the column name in the original database is different from that in the destination database, you need to set this parameter to a constant or variable.

  • Constant configuration: column_name_map="originAttr1=distAttr1,originAttr2=distAttr2", indicating that all records whose column name is originAttr1 are mapped to the distAttr1 field in the database.
  • Variable configuration: column_name_map="${attribute name}". column_name_map="${dbColumnMap} indicates that the destination column name is determined by the dbColumnMap attribute.

If this parameter is not set, the destination table is the same as the original table by default.

schema_case_sensitive

No

Indicates whether the schema in the target table is case-insensitive. The default value is false, indicating that the schema is case-insensitive.

db_type

No

Indicates the type of the destination database. The default value is DWS.

Precautions

1. Data to be inserted and data updated must be mapped to the after field. Data to be deleted and updated must be mapped to the before field.

2. The binlog of the sink stream is synchronized to the target database in sequence. Therefore, ensure that the binlog of the source stream meets the expected time sequence and transaction division. You are advised to divide the binlogs of transaction-associated tables into the same source partition based on the transaction association of the binlogs at the source end.

Example

This example shows how to receive Maxwell binlogs from DIS and synchronize the data to the database.

Assume that there are three pieces of data. The first piece is an insert operation, the second piece is an update operation, and the third piece is a delete operation.

{
  "table":"TEST.T1",
  "op_type":"I",
  "current_ts":"2019-04-05T10:21:51.200000",
  "after":{
  "ID":111,
  "NAME":"karl",
  "AGE":21 }
}
{
  "table":"TEST.T2",
  "op_type":"U",
  "current_ts":"2019-04-05T10:21:51.200000",
  "before":{"ID":22},
  "after":{
  "ID":22,
  "NAME":"sherryUpdate",
  "AGE":23 }
}
{
  "table":"TEST.T3",
  "op_type":"D",
  "current_ts":"2019-04-05T10:21:51.200000",
  "before":{
  "ID":111,
  "NAME":"karl",
  "AGE":21 }
}

T1 corresponds to table1 in the database, T2 corresponds to table 2, and T3 corresponds to table 3. The SQL statements are as follows:

CREATE SOURCE STREAM mysqlBinLog (
  dbName String,
  tableName STRING,
  op_type STRING,
  beforeData STRING,
  afterData STRING,
  mysql STRING,
  xid LONG,
  tranxCommit BOOLEAN)
WITH (
      type = "dis",
      region = "cn-north-4",
      channel = "dis-input",
      encode = "json",
      json_config = "dbName=database;tableName=table;op_type=type;beforeData=old;afterData=data;mysql=sql;xid=xid;tranxCommit=commit",
      enable_checkpoint = "true",
      checkpoint_app_name = "dis-sync-app",
      checkpoint_interval = "600",
      offset = "-1000"
     );

CREATE SINK STREAM dwsRepo (
  tableName STRING,
  op_type STRING,
  beforeData STRING,
  afterData STRING,
  xid LONG,
  tranxCommit BOOLEAN,
  mysql String)
WITH (
      type = "db_sync",
      region = "cn-north-4",
      db_url = "",
      username = "",
      password = "",
      cache_time = "86400000",
      table_name = "${tableName}",
      operation_field = "${op_type}",
      before = "${beforeData}",
      after = "${afterData}",
      tranx_id = "${xid}",
      commit = "${tranxCommit}",
      sql = "${mysql}",
      schema_case_sensitive = "false",
      db_type = "dws"
     );

INSERT INTO dwsRepo
SELECT
    "schema." || tableName,
    op_type,
    beforeData,
    afterData,
    xid,
    tranxCommit,
    mysql
FROM mysqlBinLog;