- 最新动态
- 功能总览
- 产品介绍
- 数据治理方法论
- 准备工作
- 快速入门
-
用户指南
- DataArts Studio使用流程
- 购买并配置DataArts Studio
- 授权用户使用DataArts Studio
-
管理中心
- DataArts Studio支持的数据源
- 创建DataArts Studio数据连接
-
配置DataArts Studio数据连接参数
- DWS数据连接参数说明
- DLI数据连接参数说明
- MRS Hive数据连接参数说明
- MRS HBase数据连接参数说明
- MRS Kafka数据连接参数说明
- MRS Spark数据连接参数说明
- MRS Clickhouse数据连接参数说明
- MRS Hetu数据连接参数说明
- MRS Impala数据连接参数说明
- MRS Ranger数据连接参数说明
- MRS Presto数据连接参数说明
- Doris数据连接参数说明
- OpenSource ClickHouse数据连接参数说明
- RDS数据连接参数说明
- ORACLE数据连接参数说明
- DIS数据连接参数说明
- 主机连接参数说明
- Rest Client数据连接参数说明
- Redis数据连接参数说明
- SAP HANA数据连接参数说明
- LTS数据连接参数说明
- 配置DataArts Studio资源迁移
- 配置DataArts Studio企业模式环境隔离
- 管理中心典型场景教程
-
数据集成(CDM作业)
- 数据集成概述
- 约束与限制
- 支持的数据源
- 创建并管理CDM集群
-
在CDM集群中创建连接
- 创建CDM与数据源之间的连接
-
配置连接参数
- OBS连接参数说明
- PostgreSQL/SQLServer连接参数说明
- 数据仓库服务(DWS)连接参数说明
- 云数据库MySQL/MySQL数据库连接参数说明
- Oracle数据库连接参数说明
- DLI连接参数说明
- Hive连接参数说明
- HBase连接参数说明
- HDFS连接参数说明
- FTP/SFTP连接参数说明
- Redis连接参数说明
- DDS连接参数说明
- CloudTable连接参数说明
- MongoDB连接参数说明
- Cassandra连接参数说明
- DIS连接参数说明
- Kafka连接参数说明
- DMS Kafka连接参数说明
- 云搜索服务(CSS)连接参数说明
- Elasticsearch连接参数说明
- 达梦数据库 DM连接参数说明
- SAP HANA连接参数说明
- 分库连接参数说明
- MRS Hudi连接参数说明
- MRS ClickHouse连接参数说明
- 神通(ST)连接参数说明
- CloudTable OpenTSDB连接参数说明
- GBASE连接参数说明
- YASHAN连接参数说明
- 上传CDM连接驱动
- 新建Hadoop集群配置
-
在CDM集群中创建作业
- 新建表/文件迁移作业
- 新建整库迁移作业
-
配置CDM作业源端参数
- 配置OBS源端参数
- 配置HDFS源端参数
- 配置HBase/CloudTable源端参数
- 配置Hive源端参数
- 配置DLI源端参数
- 配置FTP/SFTP源端参数
- 配置HTTP源端参数
- 配置PostgreSQL/SQL Server源端参数
- 配置DWS源端参数
- 配置SAP HANA源端参数
- 配置MySQL源端参数
- 配置Oracle源端参数
- 配置分库源端参数
- 配置MongoDB/DDS源端参数
- 配置Redis源端参数
- 配置DIS源端参数
- 配置Kafka/DMS Kafka源端参数
- 配置Elasticsearch/云搜索服务源端参数
- 配置OpenTSDB源端参数
- 配置MRS Hudi源端参数
- 配置MRS ClickHouse源端参数
- 配置神通(ST)源端参数
- 配置达梦数据库 DM源端参数
- 配置YASHAN源端参数
- 配置CDM作业目的端参数
- 配置CDM作业字段映射
- 配置CDM作业定时任务
- CDM作业配置管理
- 管理单个CDM作业
- 批量管理CDM作业
- 时间宏变量使用解析
- 优化迁移性能
- 关键操作指导
- 使用教程
- 常见错误码参考
-
数据集成(离线作业)
- 离线作业概述
- 支持的数据源
- 新建离线处理集成作业
- 配置离线处理集成作业
-
配置作业源端参数
- 配置MySQL源端参数
- 配置Hive源端参数
- 配置HDFS源端参数
- 配置Hudi源端参数
- 配置PostgreSQL源端参数
- 配置SQLServer源端参数
- 配置Oracle源端参数
- 配置DLI源端参数
- 配置OBS源端参数
- 配置SAP HANA源端参数
- 配置Kafka源端参数
- 配置Rest Client源端参数
- 配置DWS源端参数
- 配置FTP/SFTP源端参数
- 配置Doris源端参数
- 配置HBase 源端参数
- 配置ClickHouse源端参数
- 配置ElasticSearch源端参数
- 配置MongoDB源端参数
- 配置RestApi源端参数
- 配置GBase源端参数
- 配置Redis源端参数
- 配置LTS源端参数
- 配置作业目的端参数
- 字段转换器配置指导
- 新增字段操作指导
- 数据集成(实时作业)
- 数据架构
-
数据开发
- 数据开发概述
- 数据管理
- 脚本开发
- 作业开发
- 集成作业开发
- Notebook开发
- 解决方案
- 运行历史
- 运维调度
- 配置管理
- 审批中心
- 下载中心
-
节点参考
- 节点概述
- 节点数据血缘
- CDM Job
- Data Migration
- DIS Stream
- DIS Dump
- DIS Client
- Rest Client
- Import GES
- MRS Kafka
- Kafka Client
- ROMA FDI Job
- DLI Flink Job
- DLI SQL
- DLI Spark
- DWS SQL
- MRS Spark SQL
- MRS Hive SQL
- MRS Presto SQL
- MRS Spark
- MRS Spark Python
- MRS ClickHouse
- MRS Impala SQL
- MRS Flink Job
- MRS MapReduce
- CSS
- Shell
- RDS SQL
- ETL Job
- Python
- DORIS SQL
- ModelArts Train
- Create OBS
- Delete OBS
- OBS Manager
- Open/Close Resource
- Data Quality Monitor
- Sub Job
- For Each
- SMN
- Dummy
- EL表达式参考
- 简易变量集参考
- 使用教程
- 数据质量
- 数据目录
- 数据安全
- 数据服务
- 审计日志
- 最佳实践
- SDK参考
-
API参考
- 使用前必读
- API概览
- 如何调用API
- 数据集成API
- 数据开发API(V1)
- 数据开发API(V2)
- 数据架构API
- 数据质量API
- 数据服务API
- 应用示例
- 附录
-
常见问题
-
咨询与计费
- 区域和可用区如何选择?
- 数据库、数据仓库、数据湖与华为智能数据湖方案是什么,有哪些区别和联系?
- DataArts Studio和沃土是什么关系?
- DataArts Studio和ROMA有什么差异?
- DataArts Studio是否支持私有化部署到本地或私有云?
- 如何在IAM中创建细粒度权限策略?
- 如何实现用户的工作空间隔离,使其无法查看其他未授权工作空间?
- 用户已添加权限,还是无法查看工作空间?
- IAM用户操作时报错“无xx权限”怎么办?
- DataArts Studio的工作空间可以删除吗?
- 实例试用/购买成功后,可以转移到其他账号下吗?
- DataArts Studio是否支持版本升级?
- DataArts Studio是否支持版本降级?
- 如何查看DataArts Studio的版本?
- 购买DataArts Studio实例时为什么选不到指定的IAM项目?
- DataArts Studio的会话超时时间是多少,是否支持修改?
- 套餐包到期未续订或按需资源欠费时,我的数据会保留吗?
- 如何查看套餐包的剩余时长?
- DataArts Studio实例中的CDM没有计费是什么原因?
- 为什么会提示每日执行节点个数超过上限,应该怎么处理?
- 管理中心
-
数据集成(CDM作业)
- CDM与其他数据迁移服务有什么区别,如何选择?
- CDM有哪些优势?
- CDM有哪些安全防护?
- 如何降低CDM使用成本?
- CDM未使用数据传输功能时,是否会计费?
- 已购买包年包月的CDM套餐包,为什么还会产生按需计费的费用?
- 如何查看套餐包的剩余时长?
- CDM可以跨账户使用吗?
- CDM集群是否支持升级操作?
- CDM迁移性能如何?
- CDM不同集群规格对应并发的作业数是多少?
- 是否支持增量迁移?
- 是否支持字段转换?
- Hadoop类型的数据源进行数据迁移时,建议使用的组件版本有哪些?
- 数据源为Hive时支持哪些数据格式?
- 是否支持同步作业到其他集群?
- 是否支持批量创建作业?
- 是否支持批量调度作业?
- 如何备份CDM作业?
- 如何解决HANA集群只有部分节点和CDM集群网络互通?
- 如何使用Java调用CDM的Rest API创建数据迁移作业?
- 如何将云下内网或第三方云上的私网与CDM连通?
- CDM是否支持参数或者变量?
- CDM迁移作业的抽取并发数应该如何设置?
- CDM是否支持动态数据实时迁移功能?
- CDM是否支持集群关机功能?
- 如何使用表达式方式获取当前时间?
- 日志提示解析日期格式失败时怎么处理?
- 字段映射界面无法显示所有列怎么处理?
- CDM迁移数据到DWS时如何选取分布列?
- 迁移到DWS时出现value too long for type character varying怎么处理?
- OBS导入数据到SQL Server时出现Unable to execute the SQL statement怎么处理?
- 获取集群列表为空/没有权限访问/操作时报当前策略不允许执行?
- Oracle迁移到DWS报错ORA-01555
- MongoDB连接迁移失败时如何处理?
- Hive迁移作业长时间卡顿怎么办?
- 使用CDM迁移数据由于字段类型映射不匹配导致报错怎么处理?
- MySQL迁移时报错“JDBC连接超时”怎么办?
- 创建了Hive到DWS类型的连接,进行CDM传输任务失败时如何处理?
- 如何使用CDM服务将MySQL的数据导出成SQL文件,然后上传到OBS桶?
- 如何处理CDM从OBS迁移数据到DLI出现迁移中断失败的问题?
- 如何处理CDM连接器报错“配置项 [linkConfig.iamAuth] 不存在”?
- 报错“配置项[linkConfig.createBackendLinks]不存在”或“配置项 [throttlingConfig.concurrentSubJobs] 不存在怎么办”?
- 新建MRS Hive连接时,提示:CORE_0031:Connect time out. (Cdm.0523) 怎么解决?
- 迁移时已选择表不存在时自动创表,提示“CDM not support auto create empty table with no column”怎么处理?
- 创建Oracle关系型数据库迁移作业时,无法获取模式名怎么处理?
- MySQL迁移时报错:invalid input syntax for integer: "true"
- 数据集成(实时作业)
- 数据架构
-
数据开发
- 数据开发可以创建多少个作业,作业中的节点数是否有限制?
- DataArts Studio支持自定义的Python脚本吗?
- 作业关联的CDM集群删除后,如何快速修复?
- 作业的计划时间和开始时间相差大,是什么原因?
- 相互依赖的几个作业,调度过程中某个作业执行失败,是否会影响后续作业?这时该如何处理?
- 通过DataArts Studio调度大数据服务时需要注意什么?
- 环境变量、作业参数、脚本参数有什么区别和联系?
- 打不开作业日志,返回404报错?
- 配置委托时获取委托列表失败如何处理?
- 数据开发创建数据连接,为什么选不到指定的周边资源?
- 配置了SMN通知,却收不到作业失败告警通知?
- 作业配置了周期调度,但是实例监控没有作业运行调度记录?
- Hive SQL和Spark SQL脚本执行失败,界面只显示执行失败,没有显示具体的错误原因?
- 数据开发节点运行中报TOKEN不合法?
- 作业开发时,测试运行后如何查看运行日志?
- 月周期的作业依赖天周期的作业,为什么天周期作业还未跑完,月周期的作业已经开始运行?
- 执行DLI脚本,报Invalid authentication怎么办?
- 创建数据连接时,在代理模式下为什么选不到需要的CDM集群?
- 作业配置了每日调度,但是实例没有作业运行调度记录?
- 查看作业日志,但是日志中没有内容?
- 创建了2个作业,但是为什么无法建立依赖关系?
- DataArts Studio执行调度时,报错提示“作业没有可以提交的版本”怎么办?
- DataArts Studio执行调度时,报错提示“作业中节点XXX关联的脚本没有提交的版本”怎么办?
- 提交调度后的作业执行失败,报depend job [XXX] is not running or pause怎么办?
- 如何创建数据库和数据表,数据库对应的是不是数据连接?
- 为什么执行完HIVE任务什么结果都不显示?
- 在作业监控页面里的“上次实例状态”只有运行成功、运行失败,这是为什么?
- 如何创建通知配置对全量作业都进行结果监控?
- 数据开发的并行执行节点数是多少?
- DataArts Studio是否支持修改时区?
- CDM作业改名后,在数据开发中如何同步?
- 执行RDS SQL,报错hll不存在,在DataArts Studio可以执行成功?
- 创建DWS数据连接时报错提示“The account has been locaked”怎么处理?
- 作业实例取消了,日志提示“The node start execute failed, so the current node status is set to cancel.”怎么处理?
- 调用数据开发接口报错“Workspace does not exists”怎么处理?
- Postman调用接口返回结果正常,为什么测试环境调用接口的URL参数不生效?
- 执行Python脚本报错:Agent need to be updated?
- 节点状态为成功,为什么日志显示运行失败?
- 调用数据开发API报错Unknown Exception?
- 调用创建资源的API报错“资源名不合法”是什么原因?
- 补数据的作业实例都是成功的,为什么补数据任务失败了?
- DWS数据连接可视化建表,报错提示“表已存在”,但是展开数据连接看不到该表?
- 调度MRS spark作业报错“The throttling threshold has been reached: policy user over ratelimit,limit:60,time:1 minute.”怎么处理?
- 执行Python脚本,报错“UnicodeEncodeError :‘ascii' codec cant encode characters in position 63-64 : ordinal not in range ( 128 )”怎么处理?
- 查看日志时,系统提示“OBS日志文件不存在,请检查文件是否被删除或者没有OBS写入权限。”怎么办?
- Shell/Python节点执行失败,后台报错session is down
- 请求头中参数值长度超过512个字符时,何如处理?
- 执行DWS SQL脚本时,提示id不存在,如何处理?
- 如何查看CDM作业被哪些作业进行调用?
- 使用python调用执行脚本的api报错:The request parameter invalid,如何处理?
- 在数据开发子模块中,新建的DLI SQL脚本默认队列是一个已删除的队列,怎么处理?
- 数据开发中的事件驱动是否支持线下kafka?
- 数据质量
- 数据目录
- 数据安全
- 数据服务
-
咨询与计费
-
更多文档
-
用户指南(吉隆坡区域)
- 产品介绍
- 准备工作
-
用户指南
- 使用DataArts Studio前的准备
- 管理中心
- 数据集成
-
数据开发
- 数据开发概述
- 数据管理
- 脚本开发
- 作业开发
- 解决方案
- 运行历史
- 运维调度
- 配置管理
-
节点参考
- 节点概述
- CDM Job
- Rest Client
- Import GES
- MRS Kafka
- Kafka Client
- ROMA FDI Job
- DLI Flink Job
- DLI SQL
- DLI Spark
- DWS SQL
- MRS Spark SQL
- MRS Hive SQL
- MRS Presto SQL
- MRS Spark
- MRS Spark Python
- MRS Flink Job
- MRS MapReduce
- CSS
- Shell
- RDS SQL
- ETL Job
- Python
- Create OBS
- Delete OBS
- OBS Manager
- Open/Close Resource
- Sub Job
- For Each
- SMN
- Dummy
- EL表达式参考
- 使用教程
-
常见问题
- 咨询
- 管理中心
-
数据集成
- 通用类
- 功能类
-
故障处理类
- OBS导入数据到SQL Server时出现Unable to execute the SQL statement怎么处理?
- Oracle迁移到DWS报错ORA-01555
- MongoDB连接迁移失败时如何处理?
- Hive迁移作业长时间卡住怎么办?
- 使用CDM迁移数据由于字段类型映射不匹配导致报错怎么处理?
- MySQL迁移时报错“JDBC连接超时”怎么办?
- 创建了Hive到DWS类型的连接,进行CDM传输任务失败时如何处理?
- 如何使用CDM服务将MySQL的数据导出成SQL文件,然后上传到OBS桶?
- 如何处理CDM从OBS迁移数据到DLI出现迁移中断失败的问题?
- 如何处理CDM连接器报错“配置项 [linkConfig.iamAuth] 不存在”?
- 创建数据连接时报错“配置项[linkConfig.createBackendLinks]不存在”或创建作业时报错“配置项 [throttlingConfig.concurrentSubJobs] 不存在”怎么办?
- 新建MRS Hive连接时,提示:CORE_0031:Connect time out. (Cdm.0523) 怎么解决?
- 迁移时已选择表不存在时自动创表,提示“CDM not support auto create empty table with no column”怎么处理?
- 创建Oracle关系型数据库迁移作业时,无法获取模式名怎么处理?
-
数据开发
- 数据开发可以创建多少个作业,作业中的节点数是否有限制?
- 作业的计划时间和开始时间相差大,是什么原因?
- 相互依赖的几个作业,调度过程中某个作业执行失败,是否会影响后续作业?这时该如何处理?
- 通过DataArts Studio调度大数据服务时需要注意什么?
- 环境变量、作业参数、脚本参数有什么区别和联系?
- 作业失败无法查看节点错误日志?
- 配置委托时获取委托列表失败如何处理?
- 每日执行节点个数超过上限,怎么排查哪些作业调度节点比较多?
- 数据开发创建数据连接,为什么选不到指定的周边资源?
- 作业配置了周期调度,但是实例监控没有作业运行调度记录?
- Hive SQL和Spark SQL脚本脚本执行失败,界面只显示执行失败,没有显示具体的错误原因?
- 数据开发节点运行中报TOKEN不合法?
- 作业开发时,测试运行后如何查看运行日志?
- 月周期的作业依赖天周期的作业,为什么天周期作业还未跑完,月周期的作业已经开始运行?
- 执行DLI脚本,报Invalid authentication怎么办?
- 创建数据连接时,在代理模式下为什么选不到需要的CDM集群?
- 作业配置了每日调度,但是实例没有作业运行调度记录?
- 查看作业日志,但是日志中没有内容?
- 创建了2个作业,但是为什么无法建立依赖关系?
- DataArts Studio执行调度时报错:提示作业没有可以提交的版本怎么办?
- DataArts Studio执行调度时报错:作业中节点XXX关联的脚本没有提交的版本?
- 提交调度后的作业执行失败,报depend job [XXX] is not running or pause怎么办?
- 如何创建数据库和数据表,数据库对应的是不是数据连接?
- 为什么执行完HIVE任务什么结果都不显示?
- 在作业监控页面里的 “上次实例状态” 只有运行成功、运行失败,这是为什么?
- 如何创建通知配置对全量作业都进行结果监控?
- DataArts Studio的版本规格与并行执行节点数之间有什么关系?
- 启动用户、执行用户、工作空间委托、作业委托它们之间的优先级顺序是什么?
- API参考(吉隆坡区域)
-
用户指南(吉隆坡区域)
- 通用参考
链接复制成功!
MySQL同步到Kafka作业配置
支持的源端和目的端数据库版本
源端数据库 |
目的端数据库 |
---|---|
MySQL数据库(5.6、5.7、8.x版本) |
Kafka集群(2.7、3.x版本) |
数据库账号权限要求
在使用Migration进行同步时,源端和目的端所使用的数据库账号需要满足以下权限要求,才能启动实时同步任务。不同类型的同步任务,需要的账号权限也不同,详细可参考下表进行赋权。
类型名称 |
权限要求 |
---|---|
源数据库连接账号 |
需要具备如下最小权限:SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT,即执行SQL: GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '用户名'@'%'; |
目标数据库连接账号 |
MRS用户需要拥有Kafka对应Topic的读写权限,即必须属于kafka/kafkaadmin/kafkasuperuser用户组。 kafka普通用户需要被Kafka管理员用户授予特定Topic的读写权限,才能访问对应Topic。 |
- 建议创建单独用于Migration任务连接的数据库账号,避免因为数据库账号密码修改,导致的任务连接失败。
- 连接源和目标数据库的账号密码修改后,请同步修改管理中心对应的连接信息,避免任务连接失败后自动重试,导致数据库账号被锁定影响使用。
支持的同步对象范围
在使用Migration进行同步时,不同类型的链路,支持的同步对象范围不同,详细情况可参考下表。
类型名称 |
使用须知 |
---|---|
同步对象范围 |
|
注意事项
除了数据源版本、连接账号权限及同步对象范围外,您还需要注意的事项请参见下表。
类型名称 |
使用和操作限制 |
---|---|
数据库限制 |
源端数据库中的库名、表名、字段名不能包含:.<'>/\"以及非ASCII字符,建议尽量使用常规字符避免任务失败。 |
使用限制 |
通用:
全量同步阶段: 任务启动和全量数据同步阶段,请不要在源数据库执行DDL操作,否则可能导致任务异常。 增量同步阶段: 增量同步过程中,分库分表场景下,在多个分表执行的DDL,会同步多条数据到Kafka的Topic中。 常见故障排查: 在任务创建、启动、全量同步、增量同步、结束等过程中,如有遇到问题,可先参考常见问题章节进行排查。 |
其他限制 |
重命名表仅支持rename后库表在同步范围中的DDL操作(例如:RENAME TABLE A TO B,B需要在同步范围内)。 |
操作步骤
本小节以RDS for MySQL到DMS Kafka实时同步为示例,介绍如何配置Migration实时集成作业。配置作业前请务必阅读使用前自检概览, 确认已做好所有准备工作。
- 参见新建实时集成作业创建一个实时集成作业并进入作业配置界面。
- 选择数据连接类型:源端选MySQL,目的端选DMS Kafka。
图1 选择数据连接类型
- 选择集成作业类型:同步类型默认为实时,同步场景包含整库和分库分表场景。
图2 选择集成作业类型
- 配置网络资源:选择已创建的MySQL、DMS Kafka数据连接和已配置好网络连接的资源组。
图3 选择数据连接及资源组
说明:
无可选数据连接时,可单击“新建”跳转至管理中心数据连接界面,单击“创建数据连接”创建数据连接,详情请参见配置DataArts Studio数据连接参数进行配置。
无可选资源组时,可单击“新建”跳转至购买资源组页面创建资源组配置,详情请参见购买创建数据集成资源组增量包进行配置。
- 检测网络连通性:数据连接和资源组配置完成后需要测试整个迁移任务的网络连通性,可通过以下方式进行数据源和资源组之间的连通性测试。
- 单击展开“源端配置”触发连通性测试,会对整个迁移任务的连通性做校验。
- 单击源端和目的端数据源和资源组中的“测试”按钮进行检测。
- 配置源端参数。
各同步场景下选择需要同步库表的方式请参考下表。
表5 选择需要同步的库表 同步场景
配置方式
整库
选择需要迁移的MySQL库表。图4 选择库表库与表均支持自定义选择,即可选择一库一表,也可选择多库多表。
分库分表
添加逻辑表。- 逻辑表名:即最终写入到DMS Kafka的Topic名。
- 源库过滤条件:支持填入正则表达式,在所有MySQL实例中通过该正则表达式过滤出要抽取数据写入目标端Kafka Topic的所有分库。
- 源表过滤条件:支持填入正则表达式,在过滤出的源端分库中再次过滤出要抽取数据写入目标端Kafka Topic的所有分表。
图5 添加逻辑表
已添加的逻辑表支持预览表结构及来源库表,单击“操作”列的预览即可。预览逻辑表时,源表数量越多,等待时间可能越长,请耐心等待。
图6 逻辑表预览 - 配置目的端参数。
图7 Kafka目的端配置项
- 目标Topic名称规则。
配置源端MySQL库表与目的端Kafka Topic的映射规则
表6 目标Topic名称规则 同步场景
配置方式
整库
配置源端MySQL库表与目的端Kafka Topic的映射规则,可指定为固定的一个Topic,也可使用内置变量做映射,将不同源表数据同步到不同的Topic中。
可以使用的内置变量有:
- 源库名:#{source_db_name}。
- 源表名:#{source_table_name}。
分库分表
无该配置项,默认使用源端配置的逻辑表名作为目的端同步的Topic名。
- 同步kafka partition策略
支持以下三种投递策略将源端的数据按规则同步到Kafka Topic的特定Partition:
- 全部投递到Partition 0。
- 按库名+表名的hash值投递到不同Partition。
- 按表的主键值hash值投递到不同的Partition。
说明:
源端无主键情况下,目的端默认投递到partition 0。
- 需要同步的数据库操作
- 投递到Kafka的数据格式
- 新建Topic的Partition数量
- Kafka目标端属性配置
支持设置Kafka的配置项,需要增加 properties. 前缀,作业将自动移除前缀并传入底层Kafka客户端,具体参数可参考Apache Kafka官方文档中的配置说明。
- 高级配置
支持在作业“任务配置”中添加自定义属性来开启部分高级功能,参数详情可参考MySQL->Kafka高级参数一览表。
图8 添加自定义属性表7 MySQL > Kafka高级参数一览表 参数名
参数类型
默认值
单位
参数说明
source.server.timezone
string
本地时区
-
连接源端数据库时指定的session时区,支持时区标准写法,例如utc+8等。
source.convert.timestampWithServerTimeZone
boolean
true
-
timestamp类型数据输出时转为按源端时区。
source.convert.bit1AsInt
boolean
true
-
是否将bit1输出成int类型。
sink.delivery-guarantee
string
at-least-once
-
Flink写Kafka时的语义保证机制。
- at-least-once:在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
- exactly-once:该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据,在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。
- 目标Topic名称规则。
- 刷新源表和目标表映射,检查映射关系是否正确。
表8 源表与目标表映射 同步场景
配置方式
整库
支持用户根据实际需求修改映射后的目的端Topic名称,可以配置为一对一、多对一的映射关系。
图9 整库场景下源表与目标表映射分库分表
默认使用源端配置的逻辑表名作为目的端的Topic名称。
图10 分库分表场景下源表与目标表映射 - 配置任务属性。
表9 任务配置参数说明 参数
说明
默认值
执行内存
作业执行分配内存,跟随处理器核数变化而自动变化。
8GB
处理器核数
范围:2-32。
每增加1处理核数,则自动增加4G执行内存和1并发数。
2
并发数
作业执行支持并发数。该参数无需配置,跟随处理器核数变化而自动变化。
1
自动重试
作业失败时是否开启自动重试。
否
最大重试次数
“自动重试”为是时显示该参数。
1
重试间隔时间
“自动重试”为是时显示该参数。
120秒
是否写入脏数据
选择是否记录脏数据,默认不记录脏数据,当脏数据过多时,会影响同步任务的整体同步速度。
链路是否支持写入脏数据,以实际界面为准。
- 否:默认为否,不记录脏数据。
- 是:允许脏数据,即任务产生脏数据时不影响任务执行。
允许脏数据并设置其阈值时:
- 若产生的脏数据在阈值范围内,同步任务将忽略脏数据(即不会写入目标端),并正常执行。
- 若产生的脏数据超出阈值范围,同步任务将失败退出。
说明:
脏数据认定标准:脏数据是对业务没有意义,格式非法或者同步过程中出现问题的数据;单条数据写入目标数据源过程中发生了异常,则此条数据为脏数据。 因此只要是写入失败的数据均被归类于脏数据。
例如,源端是VARCHAR类型的数据写到INT类型的目标列中,则会因为转换不合理导致脏数据不会成功写入目的端。用户可以在同步任务配置时,配置同步过程中是否写入脏数据,配置脏数据条数(单个分片的最大错误记录数)保证任务运行,即当脏数据超过指定条数时,任务失败退出。
否
脏数据策略
“是否写入脏数据”为是时显示该参数,当前支持以下策略:
- 不归档:不对脏数据进行存储,仅记录到任务日志中。
- 归档到OBS:将脏数据存储到OBS中,并打印到任务日志中。
不归档
脏数据写入连接
“脏数据策略”选择归档到OBS时显示该参数。
脏数据要写入的连接,目前只支持写入到OBS连接。
-
脏数据目录
脏数据写入的OBS目录。
-
脏数据阈值
是否写入脏数据为是时显示该参数。
用户根据实际设置脏数据阈值。
说明:
- 脏数据阈值仅针对每个并发生效。比如阈值为100,并发为3,则该作业可容忍的脏数据条数最多为300。
- 输入-1表示不限制脏数据条数。
100
添加自定义属性
支持通过自定义属性修改部分作业参数及开启部分高级功能,详情可参见任务性能调优章节。
-
- 提交并运行任务。
作业配置完毕后,单击作业开发页面左上角“提交”,完成作业提交。
图11 提交作业提交成功后,单击作业开发页面“启动”按钮,在弹出的启动配置对话框按照实际情况配置同步位点参数,单击“确定”启动作业。
图12 启动配置表10 启动配置参数 参数
说明
同步模式
- 增量同步:从指定时间位点开始同步增量数据。
- 全量+增量:先同步全量数据,随后实时同步增量数据。
时间
增量同步需要设置该参数,指示增量同步起始的时间位点。
说明:
配置的位点时间早于Binlog日志最早时间点时,默认会以日志最新时间点开始消费。
- 监控作业。
通过单击作业开发页面导航栏的“前往监控”按钮,可前往作业监控页面查看运行情况、监控日志等信息,并配置对应的告警规则,详情请参见实时集成任务运维。
图13 前往监控
性能调优
若链路同步速度过慢,可参考参见任务性能调优章节中对应链路文档进行排查及处理。