Synchronizing Data to RDS and DWS
Overview
CS can synchronize data to Relational Database Service (RDS) and Data Warehouse Service (DWS) in real time based on the binlog format. Currently, PostgreSQL and MySQL databases are supported. The PostgreSQL database can store data of more complex types and delivers space information services, multi-version concurrent control (MVCC), and high concurrency. It applies to location applications, financial insurance, and e-commerce. The MySQL database reduces IT deployment and maintenance costs in various scenarios, such as web applications, e-commerce, enterprise applications, and mobile applications.
RDS is a cloud-based web service. RDS includes databases of the following types: MySQL, HWSQL, PostgreSQL, and Microsoft SQL Server.
For more information about RDS, see the Relational Database Service User Guide.
DWS is a high-performance, fully hosted, and large-scale parallel processing database service. It supports row- and column-based storage and provides features such as one-click deployment and rapid data import to the database. It aims to meet the requirements of cloud users for secure, reliable, and fast data mining, storage, and analysis.
For more information about DWS, see the Data Warehouse Service Management Guide.
Prerequisites
- Ensure that you have created a PostgreSQL or MySQL RDS instance in RDS.
For details about how to create an RDS instance, see Buying an Instance in the Relational Database Service Getting Started Guide.
- In this scenario, jobs must run on the exclusive cluster of CS. Therefore, CS must interconnect with the VPC that has been connected with RDS instance. You can also set the security group rules as required.
For details about how to set up the VPC peering connection, see VPC Peering Connection in the Cloud Stream Service User Guide.
For details about how to configure security group rules, see Security Group in the Virtual Private Cloud User Guide.
Syntax
Syntax
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "db_sync",
region = "",
db_url = "",
username = "",
password = "",
table_name = "${attr_name}",
operation_field = "${attr_name}",
before = "${attr_name}",
after = "${attr_name}",
tranx_id = "${attr_name}",
commit = "${attr_name}",
sql = "${attr_name}",
table_name_map = "${attr_name}",
column_name_map ="${attr_name}",
schema_case_sensitive = "false",
db_type = "dws",
);
Description
This feature supports database synchronization for MySQL, PostgreSQL, and DWS. Multiple tables in the same database can be synchronized.
| Parameter | Mandatory | Description |
|---|---|---|
| db_url | Yes | Indicates the database connection address, for example, {database_type}://ip:port/database. Currently, the following database connections are supported:
|
| username | Yes | Indicates the username of a database. |
| password | Yes | Indicates the password of a database. |
| table_name | Yes | Indicates the table name, that is the table name field in the sink stream. The format is ${attribute name}. This attribute is parsed from the table field in the binlog. |
| operation_field | Yes | Indicates the operation type, that is the operation type field in the sink stream. The format is ${attribute name}. The operation type can be I, U, D, INSERT, UPDATE, or DELETE. This attribute is parsed from the operation type field in the binlog. |
| before | Yes | Indicates the original content field in the sink stream. The format is ${attribute name}. |
| after | Yes | Indicates the new content field in the sink stream. The format is ${attribute name}. |
| tranx_id | No | Indicates the event ID field in the sink stream. The format is ${attribute name}. |
| commit | No | Indicates the event completed field in the sink stream. The format is ${attribute name}. |
| sql | No | Indicates the field of DDL SQL statements in the sink stream. The format is ${attribute name}. |
| table_name_map | No | If the table name in the original database is different from that in the destination database, you need to set this parameter to a constant or variable.
If this parameter is not set, the destination table is the same as the original table by default. |
| column_name_map | No | If the column name in the original database is different from that in the destination database, you need to set this parameter to a constant or variable.
If this parameter is not set, the destination table is the same as the original table by default. |
| schema_case_sensitive | No | Indicates whether the schema in the target table is case-insensitive. The default value is false, indicating that the schema is case-insensitive. |
| db_type | No | Indicates the type of the destination database. The default value is DWS. |
Precautions
1. Data to be inserted and data updated must be mapped to the after field. Data to be deleted and updated must be mapped to the before field.
2. The binlog of the sink stream is synchronized to the target database in sequence. Therefore, ensure that the binlog of the source stream meets the expected time sequence and transaction division. You are advised to divide the binlogs of transaction-associated tables into the same source partition based on the transaction association of the binlogs at the source end.
Example
This example shows how to receive Maxwell binlogs from DIS and synchronize the data to the database.
Assume that there are three pieces of data. The first piece is an insert operation, the second piece is an update operation, and the third piece is a delete operation.
{
"table":"TEST.T1",
"op_type":"I",
"current_ts":"2019-04-05T10:21:51.200000",
"after":{
"ID":111,
"NAME":"karl",
"AGE":21 }
} {
"table":"TEST.T2",
"op_type":"U",
"current_ts":"2019-04-05T10:21:51.200000",
"before":{"ID":22},
"after":{
"ID":22,
"NAME":"sherryUpdate",
"AGE":23 }
} {
"table":"TEST.T3",
"op_type":"D",
"current_ts":"2019-04-05T10:21:51.200000",
"before":{
"ID":111,
"NAME":"karl",
"AGE":21 }
} T1 corresponds to table1 in the database, T2 corresponds to table 2, and T3 corresponds to table 3. The SQL statements are as follows:
CREATE SOURCE STREAM mysqlBinLog (
dbName String,
tableName STRING,
op_type STRING,
beforeData STRING,
afterData STRING,
mysql STRING,
xid LONG,
tranxCommit BOOLEAN)
WITH (
type = "dis",
region = "cn-north-4",
channel = "dis-input",
encode = "json",
json_config = "dbName=database;tableName=table;op_type=type;beforeData=old;afterData=data;mysql=sql;xid=xid;tranxCommit=commit",
enable_checkpoint = "true",
checkpoint_app_name = "dis-sync-app",
checkpoint_interval = "600",
offset = "-1000"
);
CREATE SINK STREAM dwsRepo (
tableName STRING,
op_type STRING,
beforeData STRING,
afterData STRING,
xid LONG,
tranxCommit BOOLEAN,
mysql String)
WITH (
type = "db_sync",
region = "cn-north-4",
db_url = "",
username = "",
password = "",
cache_time = "86400000",
table_name = "${tableName}",
operation_field = "${op_type}",
before = "${beforeData}",
after = "${afterData}",
tranx_id = "${xid}",
commit = "${tranxCommit}",
sql = "${mysql}",
schema_case_sensitive = "false",
db_type = "dws"
);
INSERT INTO dwsRepo
SELECT
"schema." || tableName,
op_type,
beforeData,
afterData,
xid,
tranxCommit,
mysql
FROM mysqlBinLog; Last Article: RDS Sink Stream
Next Article: DWS Sink Stream (JDBC Mode)
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.