文档首页/ 数据仓库服务 DWS/ 最佳实践/ 数据迁移/ 使用DataX迁移数据到DWS集群
更新时间:2025-08-22 GMT+08:00

使用DataX迁移数据到DWS集群

本实践以神通数据库迁移到DWS集群为例,演示开源离线数据同步工具DataX与DWS对接过程,实现多种异构数据源迁移到DWS。

本实践预计时长120分钟基本流程如下:

  1. 准备工作:准备DWS集群和源数据库、预装好Python和JDK环境。
  2. 步骤一:安装DataX:从GitHub开源社区获取DataX包并安装。
  3. 步骤二:获取DataX的作业json模板:获取迁移作业json模板。
  4. 步骤三:准备Reader:Reader指数据读取端,即源数据库,对于DataX不支持的源端数据库,需要单独适配Reader,如DataX已支持数据源,则跳过该步骤。
  5. 步骤四:准备Writer:Writer指数据写入端,即目标数据库,需要将DWS插件包放入DataX配置文件中。
  6. 步骤五:配置Json文件:配置源端和目标端连接信息、迁移参数等。
  7. 步骤六:运行迁移作业:完成数据导入。

什么是DataX

DataX是阿里巴巴开源的离线数据同步工具,实现了包括主流RDBMS数据库、NoSQL、大数据计算系统在内的多种异构数据源之间高效进行数据同步的功能。

图1 DataX功能图

为了解决异构数据源的同步问题,DataX将复杂的网状同步链路优化成了星型数据链路,由DataX作为中间传输载体来负责连接各种数据源,以此来降低整个异构数据源同步链路的复杂度。当需要新接入一个数据源的时候,只需要考虑将该新的数据源对接到DataX即可,就能跟已有的所有数据源无缝同步。

图2 DataX迁移链路

DataX的执行过程

DataX执行过程如下图所示:

图3 DataX作业分割
  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接收到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader > Channel > Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

总结一个DataX Job会切分成多个Task,每个Task会按TaskGroup进行分组,一个Task内部会有一组Reader > Channel > Writer的数据迁移链路。Channel是连接Reader和Writer的数据交换通道,所有的数据都会经由Channel进行传输

DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,支持数据源请参见DataX网址

准备工作

  • 已注册华为账号并开通华为云,且在使用DWS 前检查账号状态,账号不能处于欠费或冻结状态。
  • 已创建虚拟私有云和子网,参见创建虚拟私有云和子网
  • 已创建目标端DWS集群。
  • 已准备源端数据源:神通数据库。
  • 准备好Linux服务器或Windows环境。
  • 安装Java运行环境(jdk1.8以上,推荐1.8)
  • 安装Python运行环境(python2以上)

步骤一:安装DataX

  1. 访问DataX的GitHub网址,下拉页面找到Quick Start,下载DataX,解压缩到合适的目录。

  2. 进入到DataX的根目录下,运行自带的示例

    python ./bin/datax.py ./job/job.json

    运行后控制台显示运行成功即表示DataX安装完成。

步骤二:获取DataX的作业json模板

  1. 使用工具自带指令,获取一个简单的job.json的模板,模板内容和参数解释如下所示。

    python ./bin/datax.py -r streamreader -w streamwrite

    运行后控制台显示

模板json:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [],
                        "sliceRecordCount": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

json配置简介

作业通过json文件定义,执行DataX时需要指定对应的json文件,json中包括Reader、Writer、和setting三个部分,常用参数如表1所示。

表1 json配置参数解释

参数大类

描述

子参数

示例

reader

用于定义数据的输入,使用name指定reader的类型,模板json中streamreader是一个从内存读取数据的插件, 它主要用来快速生成期望的数据并对写入插件进行测试。

常用的reader使用介绍可查看常用reader,例 :读取mysql数据源时,name可指定为"mysqlreader".

mysqlreader

writer

用于定义数据的写入,使用name指定writer的类型,模板json中streamreader是一个将数据写入内存的插件,一般用来将获取到的数据写到终端,用来调试读取插件的数据处理情况。

常用的reader使用介绍可查看常用writer,本文档只介绍使用dwswriter的场景,输出数据到dws数据库时,name指定为"dwswriter".

dwswriter

setting

定义任务的运行参数,例如速度控制(speed)、错误处理(errorLimit)等。

  • speed:根据Byte 、Record 、Channel 对作业的并发进行控制,子参数如下:
    • byte: 配置全局 Byte 限速以及单 Channel Byte 限速。Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速
    • record:配置全局 Record 限速以及单 Channel Record 限速。Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速
    • channel:直接配置 Channel 个数。
  • errorLimit:错误处理(脏数据控制),子参数如下
    • record:对脏数据最大记录数阈值(record值)。
    • percentage:脏数据占比阈值(percentage值)。

    当达到数量或百分比阈值时,DataX Job报错退出

-

步骤三:准备Reader

在迁移数据前,需要先准备Reader。

查看datax\plugin\reader目录,该目录下包含了DataX已适配的数据源。同时每个Reader文件夹中包含该reader的模板json(文件名:plugin_job_template.json),使用前可结合常用reader了解如何进行配置。

如果已有对应数据源的Reader,则可以跳过此步。如果没有适配的Reader,则需要手动添加Reader。

  1. 因DataX没有适配神通数据库,需要将其添加到自定义Reader中。

    找到DataX安装目录下的plugin文件夹,该文件夹有reader和writer两个文件夹,找到reader目录下的rdbmsreader。

  2. 将神通数据库jdbc驱动包oscarJdbc.jar加入到libs文件夹下,并修改plugin.json文件,加入驱动Class全类名。

步骤四:准备Writer

dwswriter为dws适配DataX开发的writer组件包,用于输出数据到dws数据库。

  1. 访问下载地址获取dwswriter.zip
  2. 解压加入到writer目录下。

步骤五:配置Json文件

Json文件主要分两部分,reader对应的数据源名称,以及连接数据源需要配置的url、用户名密码,以及需要读取的目标表,writer同理。

表2 Reader配置

参数名

描述

示例

name

源端数据源类型,例如rdbmsreader。

rdbmsreader

parameter.username

Jdbc登录用户名。

root

parameter.password

Jdbc登录密码。

******

parameter.column

需要同步的表字段,默认*代表全部。

["*"] 或 ["id","name","age","create_time"]

parameter.connection.jdbcUrl

Jdbc地址,由host、ip和数据库名构成。

jdbc:oscar://x.x.x.x:2003/TESTDB

parameter.connection.table

需要同步的表名。

public.table1

parameter.where

Sql过滤条件。

age > 18

表3 Writer配置

参数名

描述

示例

name

目标端数据源类型,这里固定为dwswriter。

dwswriter

parameter.username

Jdbc登录用户名。

dbadmin

parameter.password

Jdbc登录密码。

******

parameter.batchSize

攒批数,攒多少数据往数据库写一次数据。

500

parameter.writeMode

入库方式,包括:AUTO、COPY_UPSERT、COPY_MERGE。

AUTO

parameter.column

需要同步的表字段,默认*代表全部。

["*"] 或 ["id","name","age","create_time"]

parameter.preSql

数据同步前需要执行的SQL,只执行一次。

truncate table public.table1

parameter.connection.jdbcUrl

JDBC地址,由host、ip和数据库名构成。

jdbc:postgresql://x.x.x.x:8000/gaussdb

parameter.connection.table

需要同步的表名。

public.table1

parameter.postSql

数据同步后需要执行的sql,只执行一次。

-

表4 Setting配置

参数名

描述

示例

speed.channel

需要启动多少个work线程。

5

speed.record

单work线程record限速。

10000

speed.byte

单work线程byte限速。

1000

示例json配置:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 10
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "password",
                        "column": ["id", "name", "age","create_time"],
                        "connection": [
                            {
                                "table": [
                                    "public.table1"
                                ],
                                "jdbcUrl": [
				    "jdbc:orscar://x.x.x.x:x/TESTDB"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "dwswriter",
		    "parameter": {
			"batchSize":500,
			"username": "dbadmin",
			"password": "password",
			"writeMode":"COPY_UPSERT",
			"conditionColumn":["id"],
			"column": ["*"],
			"preSql": [],
			"connection": [
				{
				     "jdbcUrl": "jdbc:postgresql://x.x.x.x:8000/gaussdb",
				     "table": ["public.table1"]
				}
			],
			"postSql": []
		    }
                }
            }
        ]
    }
}

reader端的table字段可以替换成querySql,用于拆表并行入库场景。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
"reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "password",
                        "column": ["id", "name", "age","create_time"],
                        "connection": [
                            {
                                "querySql": [
                                    "select * from public.table1 where id < 10000",
                                    "select * from public.table1 where id >= 10000"
                                ],
                                "jdbcUrl": [
				    "jdbc:orscar://x.x.x.x:x/TESTDB"
                                ]
                            }
                        ]
                    }

步骤六:运行迁移作业

  1. 因为DataX同步数据需要攒批到Buffer再写入DWS,需要较大内存;因此,在启动DataX进程时,可以调整JVM大小,以防止OOM出现。以作业oscartodws.json为例:

    python ./bin/datax.py --jvm="-Xms8G -Xmx8G" ./job/oscartodws.json

  2. 配置成功后,进入到datax的bin目录下,启动脚本:

    python ./bin/datax.py ./job/oscartodws.json

    执行结果如下:

常见问题

  1. 程序启动报错

    一般都是JDBC配置问题导致的timeout,如果没有权限访问源端表,需要授权。

  2. 并行入库经验

    如果单表写入速度不达标,需要排查: 源端、目标端网络带宽,DWS磁盘IO是否过大,是否存在损坏的信道。

    这些因素都排查过后,如果速度还是没有恢复,可以将单表作业拆分成多个分片,同时启动多个DataX作业进程来并行同步,以提高写入效率。

  3. 数据同步异常导致同步失败的情况

    Datax目前断点续传的原理是通过where条件来实现,找到数据同步的分界点。

总结:使用DataX不落地迁移,同步很稳定,结果准确,目前同步瓶颈主要集中在源端带宽上。