文档首页 > > SQL语法参考> 数据定义语句> 创建输出流> RDS和DWS数据同步输出流

RDS和DWS数据同步输出流

分享
更新时间:2020/09/29 GMT+08:00

概述

CS将支持根据binlog格式实时同步数据输出到关系型数据库(RDS)及数据仓库(DWS)中。目前支持PostgreSQL和MySQL两种数据库。PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。MySQL数据库适用于各种WEB应用、电子商务应用、企业应用、移动应用等场景,减少IT部署和维护成本。

关系型数据库(Relational Database Service,简称RDS)是一种基于云计算平台的在线关系型数据库服务。RDS包括如下几种类型的数据库:MySQL、HWSQL、PostgreSQL和Microsoft SQL Server。

RDS的更多信息,请参见《关系型数据库用户指南》

数据仓库服务(Data Warehouse Service) 是一种高性能且完全托管的大规模并行处理的数据库服务:支持行、列存储,提供一键部署、数据高速入库等特性。旨在满足云上用户的安全、可靠、快速,基于海量数据进行数据挖掘和数据存储、分析的需求。

DWS的更多信息,请参见《数据仓库服务管理指南》

前提条件

  • 请务必确保您的账户下已在关系型数据库(RDS)里创建了PostgreSQL或MySQL类型的RDS实例。

    如何创建RDS实例,请参见《关系型数据库快速入门》“购买实例”章节。

  • 该场景作业需要运行在CS的独享集群上,因此要与RDS实例建立VPC对等连接,且用户可以根据实际所需设置相应安全组规则。

    如何建立VPC对等连接,请参考《实时流计算服务用户指南》对等连接章节。

    如何设置安全组规则,请参见《虚拟私有云用户指南》“安全组”章节。

语法

语法格式

CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )

WITH (

type = "db_sync",

region = "",

db_url = "",

username = "",

password = "",

table_name = "${attr_name}",

operation_field = "${attr_name}",

before = "${attr_name}",

after = "${attr_name}",

tranx_id = "${attr_name}",

commit = "${attr_name}",

sql = "${attr_name}",

table_name_map = "${attr_name}",

column_name_map ="${attr_name}",

schema_case_sensitive = "false",

db_type = "dws",

);

语法说明

该特性支持对mysql、postgresql、dws进行数据库同步操作,支持对同一数据库多张表进行同步。

表1 参数说明

参数

是否必选

说明

db_url

数据库连接地址,格式为:"{database_type}://ip:port/database"

目前支持以下数据库连接:

  • MySQL: 'mysql://ip:port/database'
  • PostgreSQL和DWS: 'postgresql://ip:port/database'

username

数据库连接用户名。

password

数据库连接密码。

table_name

表名列,即说明sink流中的哪个字段用来表示表名,格式为${sink中的属性名},该属性一般是从binlog日志中的表字段解析而来。

operation_field

操作类型列,即说明sink流中的哪个字段用来表示操作类型,格式为${sink中的属性名},操作类型值支持I/U/D/INSERT/UPDATE/DELETE。该属性一般是从binlog日志中的操作类型字段解析而来。

before

修改前内容列,即说明sink流中的哪个字段用来表示更新前的记录内容,格式为${sink中的属性名}。

after

即说明sink流中的哪个字段用来表示更新后的记录内容,格式为${sink中的属性名} 。

tranx_id

即说明sink流中的哪个字段用来表示事务Id,格式为${sink中的属性名}。

commit

即说明sink流中的哪个字段用来表示事务是否结束,格式为${sink中的属性名}。

sql

即说明sink流中的哪个字段用来表示DDL操作SQL,格式为${sink中的属性名}。

table_name_map

当原始数据库里的表名和最终目的数据库里的表名不一致时需要进行配置,可以配置为常量,也可以配置为变量。

  • 常量配置方式: table_name_map="distdbtable", 表示所有记录最终都输出到数据库的distdbtable这张表。
  • 变量配置方式: table_name_map="${sink中的属性名}",例如,table_name_map="${dbTableName},表示当前记录的目的表由dbTableName这个属性决定。

当不配置的时候,默认目的表和原始表一样。

column_name_map

当原始数据库里的列名和最终目的数据库里的列名不一致时需要进行配置,可以配置为常量,也可以配置为变量。

  • 常量配置方式:column_name_map="originAttr1=distAttr1,originAttr2=distAttr2", 表示所有记录里涉及列名为originAttr1的最终都映射到数据库里的distAttr1字段。
  • 变量配置方式:column_name_map="${sink中的属性名}",例如,column_name_map="${dbColumnMap},表示当前记录的目的列名由dbColumnMap这个属性决定。

当不配置的时候,默认目的列名和原始列名一样。

schema_case_sensitive

说明目标表中schema是否大小写敏感,默认为“false”,即大小写不敏感。

db_type

说明目标数据库的类型,默认为DWS。

注意事项

1. 对于待插入及更新后数据,需要映射到after字段,对于更新前及待删除数据,需要映射到before字段。

2. 该输出流按顺序同步binlog到目标库,所以需要确保进入输出流的binlog是符合预期时序及事务划分的。建议在源端,根据binlog的事务关联性,进行表级别的划分,将事务相关联的表的binlog,划分到同一个源分区。

示例

该示例为从DIS接收由maxwell生成的binlog数据,并将数据同步到数据库中。

假设有如下三条数据:第一条为插入操作,第二条为更新操作,第三条为删除操作。

{
  "table":"TEST.T1",
  "op_type":"I",
  "current_ts":"2019-04-05T10:21:51.200000",
  "after":{
  "ID":111,
  "NAME":"karl",
  "AGE":21 }
}
{
  "table":"TEST.T2",
  "op_type":"U",
  "current_ts":"2019-04-05T10:21:51.200000",
  "before":{"ID":22},
  "after":{
  "ID":22,
  "NAME":"sherryUpdate",
  "AGE":23 }
}
{
  "table":"TEST.T3",
  "op_type":"D",
  "current_ts":"2019-04-05T10:21:51.200000",
  "before":{
  "ID":111,
  "NAME":"karl",
  "AGE":21 }
}

T1表对应数据库中Table1表,T2对应Table2表,T3对应Table3,则相应的SQL描述如下:

CREATE SOURCE STREAM mysqlBinLog (
  dbName String,
  tableName STRING,
  op_type STRING,
  beforeData STRING,
  afterData STRING,
  mysql STRING,
  xid LONG,
  tranxCommit BOOLEAN)
WITH (
      type = "dis",
      region = "cn-north-4",
      channel = "dis-input",
      encode = "json",
      json_config = "dbName=database;tableName=table;op_type=type;beforeData=old;afterData=data;mysql=sql;xid=xid;tranxCommit=commit",
      enable_checkpoint = "true",
      checkpoint_app_name = "dis-sync-app",
      checkpoint_interval = "600",
      offset = "-1000"
     );

CREATE SINK STREAM dwsRepo (
  tableName STRING,
  op_type STRING,
  beforeData STRING,
  afterData STRING,
  xid LONG,
  tranxCommit BOOLEAN,
  mysql String)
WITH (
      type = "db_sync",
      region = "cn-north-4",
      db_url = "",
      username = "",
      password = "",
      cache_time = "86400000",
      table_name = "${tableName}",
      operation_field = "${op_type}",
      before = "${beforeData}",
      after = "${afterData}",
      tranx_id = "${xid}",
      commit = "${tranxCommit}",
      sql = "${mysql}",
      schema_case_sensitive = "false",
      db_type = "dws"
     );

INSERT INTO dwsRepo
SELECT
    "schema." || tableName,
    op_type,
    beforeData,
    afterData,
    xid,
    tranxCommit,
    mysql
FROM mysqlBinLog;
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!非常感谢您的反馈,我们会继续努力做到更好!
反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问