CDL数据集成概述
CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。
CDL服务包含了两个重要的角色:CDLConnector和CDLService。CDLConnector是具体执行数据抓取任务的实例,CDLService是负责管理和创建任务的实例。
CDL支持在CDLService WebUI界面创建数据同步任务和数据比较任务,使用流程如图1所示。
数据同步任务
- CDL支持的数据同步任务类型:
表1 CDL支持的数据同步任务类型 数据源
目的端
描述
MySQL
Hudi
该任务支持从MySQL同步数据到Hudi。
Kafka
该任务支持从MySQL同步数据到Kafka。
PgSQL
Hudi
该任务支持从PgSQL同步数据到Hudi。
Kafka
该任务支持从PgSQL同步数据到Kafka。
Hudi
DWS
该任务支持从Hudi同步数据到DWS。
ClickHouse
该任务支持从Hudi同步数据到ClickHouse。
ThirdKafka
Hudi
该任务支持从ThirdKafka同步数据到Hudi。
Kafka
该任务支持从ThirdKafka同步数据到Kafka。
openGauss(MRS 3.3.0及之后版本支持)
ThirdKafka(DMS/DRS)->Hudi
该任务支持openGauss通过ThirdKafka(DMS/DRS)同步数据到Hudi。
Hudi
该任务支持从openGauss同步数据到Hudi。
Kafka
该任务支持从openGauss同步数据到Kafka。
ogg-oracle-avro(MRS 3.3.0及之后版本支持)
ThirdKafka(DMS/DRS)->Hudi
该任务支持avro-oracle通过ThirdKafka(DMS/DRS)同步数据到Hudi。
- CDL支持的数据同步任务中数据库版本范围: Kafka(包括ThirdKafka使用MRS Kafka作为源端)、Hudi和ClickHouse数据源是直接使用MRS集群内的相关组件作为数据源,版本号介绍请参见MRS组件版本一览表,其他数据库版本号如表2所示。
- 使用约束:
- 如果需要使用CDL, Kafka服务的配置参数“log.cleanup.policy”参数值必须为“delete”。
- MRS集群中已安装CDL服务。
- CDL仅支持抓取非系统表下的增量数据,MySQL、PostgreSQL等数据库的内置数据库不支持抓取增量数据。
- 从Hudi同步数据到DWS或ClickHouse任务中,在Hudi中物理删除的数据目的端不会同步删除。例如,在Hudi中执行delete from tableName命令硬删除表数据,目的端DWS或ClickHouse仍存在该表数据。
- MySQL数据库需要开启MySQL的bin log功能(默认情况下是开启的)和GTID功能,CDL不支持抓取表名包含“$”或者中文等特殊字符的表。
- 查看MySQL是否开启bin log:
使用工具或者命令行连接MySQL数据库(本示例使用Navicat工具连接),执行show variables like 'log_%'命令查看。
例如在navicat工具选择“File > New Query”新建查询,输入如下SQL命令,单击“Run”在结果中“log_bin”显示为“ON”则表示开启成功。
show variables like 'log_%'

- 如果当前MySQL未开启bin log功能,需执行以下操作:
可以通过修改MySQL的配置文件“my.cnf” (Windows系统是“my.ini”)开启,操作如下:
server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10
修改完成之后需要重启MySQL服务使配置生效。
- 查看MySQL是否开启GTID功能:
使用show global variables like '%gtid%'命令查看是否开启, 具体开启方法参考MySQL对应版本的官方文档。
(MySQL 8.x版本开启指导请参见https://dev.mysql.com/doc/refman/8.0/en/replication-mode-change-online-enable-gtids.html)

- 用户权限设置:
用户执行MySQL任务需要的权限需要包括SELECT、RELOAD、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT。
可执行以下命令进行赋权,命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '数据库用户名' IDENTIFIED BY '数据库用户密码';
执行以下命令刷新权限:
FLUSH PRIVILEGES;
- 查看MySQL是否开启bin log:
- PostgreSQL数据库需要修改预写日志的策略。
- 连接PostgreSQL数据库的用户需要具有replication权限和对数据库的create权限,对表要有owner权限。
- CDL不支持抓取表名包含“$”或者中文等特殊字符的表。
- PostgreSQL数据库需要有修改“statement_timeout”和“lock_timeout”两个超时参数的设置权限以及查询删除Slot和publication权限。
- “max_wal_senders”建议设置为Slot的1.5倍或2倍。
- 在PostgreSQL表的复制标识是default的情况下,如果存在以下场景,需要开启全字段补全功能:
- 场景一:
在源端数据库存在delete操作场景下,delete事件只包含主键信息, 在这时写入到Hudi的delete数据会出现只有主键字段有值, 其他业务字段都是null的情况。
- 场景二:
在数据库单条数据大小超过8k(包括8k)场景下,update事件只包含变更字段,此时Hudi数据中会出现部分字段的值为__debezium_unavailable_value的情况。
相关命令如下,其中:
- 场景一:
- 修改数据库配置文件“postgresql.conf”(默认在PostgreSQL安装目录的data文件夹下)中的参数项“wal_level = logical”。
#------------------------------------------------ #WRITE-AHEAD LOG #------------------------------------------------ # - Settings - wal_level = logical # minimal, replica, or logical # (change requires restart) #fsync = on #flush data to disk for crash safety ... - 重启数据库服务:
#停止 pg_ctl stop #启动 pg_ctl start
- DWS数据库前置准备。
同步任务启动前,源表和目标表必须存在,且表结构保持一致。DWS表中“ads_last_update_date”取值为系统当前时间。
- ThirdPartyKafka前置准备。
上层源支持opengauss和ogg,源端Kafka Topic可被MRS集群Kafka消费。
ThirdKafka不支持同步分布式Opengauss数据库数据到CDL。
- ClickHouse前置准备。
用户需要有操作ClickHouse的权限,相关操作请参见创建具有ClickHouse权限的用户。
- openGauss数据库需要开启预写日志功能(MRS 3.3.0及之后版本支持)。
- 连接openGauss数据库的用户需要具有逻辑复制权限。
- 不支持同步openGauss间隔分区表、中文名称表。
- 源端openGauss数据库中的表如果不存在主键,则不支持delete该表的数据。
- 如果源端openGauss数据库存在delete数据操作,则需要开启全字段补全功能,命令如下:
- 修改数据库配置文件“postgresql.conf”(默认在Opengauss安装目录的data文件夹下)中的参数项“wal_level = logical”。
#------------------------------------------------ #WRITE-AHEAD LOG #------------------------------------------------ # - Settings - wal_level = logical # minimal, replica, or logical # (change requires restart) #fsync = on #flush data to disk for crash safety ... - 重启数据库服务:
#停止 pg_ctl stop #启动 pg_ctl start
CDL同步任务支持的数据类型及映射关系
主要介绍CDL同步任务支持的数据类型,以及源端数据库数据类型跟Spark数据类型的映射关系。
| PostgreSQL数据类型 | Spark(Hudi)数据类型 |
|---|---|
| int2 | int |
| int4 | int |
| int8 | bigint |
| numeric(p, s) | decimal[p,s] |
| bool | boolean |
| char | string |
| varchar | string |
| text | string |
| timestamptz | timestamp |
| timestamp | timestamp |
| date | date |
| json, jsonb | string |
| float4 | float |
| float8 | double |
| MySQL数据类型 | Spark(Hudi)数据类型 |
|---|---|
| int | int |
| integer | int |
| bigint | bigint |
| double | double |
| decimal[p,s] | decimal[p,s] |
| varchar | string |
| char | string |
| text | string |
| timestamp | timestamp |
| datetime | timestamp |
| date | date |
| json | string |
| float | double |
| Oracle数据类型 | Spark(Hudi)数据类型 |
|---|---|
| NUMBER(3),NUMBER(5) | bigint |
| INTEGER | decimal |
| NUMBER(20) | decimal |
| NUMBER | decimal |
| BINARY_DOUBLE | double |
| CHAR | string |
| VARCHAR | string |
| TIMESTAMP, DATETIME | timestamp |
| timestamp with time zone | timestamp |
| DATE | timestamp |
| Opengauss Json数据类型 | Spark(Hudi)数据类型 |
|---|---|
| int2 | int |
| int4 | int |
| int8 | bigint |
| numeric(p,s) | decimal[p,s] |
| bool | boolean |
| varchar | string |
| timestamp | timestamp |
| timestampz | timestamp |
| date | date |
| jsonb | string |
| json | string |
| float4 | float |
| float8 | double |
| text | string |
| Oracle Json数据类型 | Spark(Hudi)数据类型 |
|---|---|
| number(p,s) | decimal[p,s] |
| binary double | double |
| char | string |
| varchar2 | string |
| nvarchar2 | string |
| timestamp | timestamp |
| timestamp with time zone | timestamp |
| date | timestamp |
| Oracle Avro数据类型 | Spark(Hudi)数据类型 |
|---|---|
| number[p,s] | decimal[p,s] |
| float(p) | float |
| binary_double | double |
| char(p) | string |
| varchar2(p) | string |
| timestamp(p) | timestamp |
| date | timestamp |
| Opengauss数据类型 | Spark(Hudi)数据类型 |
|---|---|
| int1 | int |
| int2 | int |
| int4 | int |
| int8 | bigint |
| numeric(p,s) | decimal[p,s] |
| bool | boolean |
| char | string |
| bpchar | string |
| nvarchar2 | string |
| text | string |
| date | date |
| timestamp | timestamp |
| timestampz | timestamp |
| json | string |
| jsonb | string |
| float4 | float |
| float8 | double |
| real | float |
| Spark(Hudi)数据类型 | DWS数据类型 |
|---|---|
| int | int |
| long | bigint |
| float | float |
| double | double |
| decimal[p,s] | decimal[p,s] |
| boolean | boolean |
| string | varchar |
| date | date |
| timestamp | timestamp |
| Spark(Hudi)数据类型 | ClickHouse数据类型 |
|---|---|
| int | Int32 |
| long | Int64 (bigint) |
| float | Float32 (float) |
| double | Float64 (double) |
| decimal[p,s] | Decimal(P,S) |
| boolean | bool |
| string | String (LONGTEXT, MEDIUMTEXT, TINYTEXT, TEXT, LONGBLOB, MEDIUMBLOB, TINYBLOB, BLOB, VARCHAR, CHAR) |
| date | Date |
| timestamp | DateTime |
数据比较任务
数据比对即是对源端数据库中的数据和目标端Hive中的数据做数据一致性校验,如果数据不一致,CDL可以尝试修复不一致的数据。相关操作请参见创建CDL数据比较任务作业。
