更新时间:2024-11-06 GMT+08:00

数据集成应用示例

本节通过cURL调用CDM API,迁移本地MySQL数据库中的数据到云上服务DWS为例,介绍使用CDM API的基本流程。
  1. 获取token

    获取用户的token,因为在后续的请求中需要将token放到请求消息头中作为认证。

  2. 创建CDM集群
    • 如果您已经创建过CDM集群,可以跳过该步骤,直接使用已创建的集群ID。
    • 如果您需要使用新的集群执行迁移任务,调用创建集群API创建。
  3. 创建连接

    调用创建连接API创建MySQL连接和DWS连接。

  4. 创建迁移作业

    调用指定集群创建作业API创建MySQL到DWS的迁移作业。

  5. 查看作业结果

    调用启动作业API开始执行作业。

准备数据

在调用API之前,您需要准备如下数据。

表1 准备数据

数据项

名称

说明

样例

云账户信息

项目名

CDM所属的项目名。

Project Name

项目ID

CDM所属的项目ID。

1551c7f6c808414d8e9f3c514a170f2e

账号名

用户所属的企业账户名称。

Account Name

用户名

使用云服务的用户名,该用户需要拥有CDM的操作权限。

Username

密码

用户密码。

password

VPC信息

VPC的ID

CDM所属的VPC必须与DWS一致。

6b47302a-bf79-4b20-bf7a-80987408e196

子网ID

CDM所属的子网必须与DWS一致。

63bdc3cb-a4e7-486f-82ee-d9bf208c8f8c

安全组ID

CDM所属的安全组必须与DWS一致。

005af77a-cce5-45ac-99c7-2ea50ea8addf

Endpoint

IAM的Endpoint

终端节点(Endpoint)即调用API的请求地址,不同服务不同区域的终端节点不同。Endpoint您可以从地区和终端节点获取。

iam_endpoint

CDM的Endpoint

终端节点(Endpoint)即调用API的请求地址,不同服务不同区域的终端节点不同。本服务的Endpoint您可以从终端节点Endpoint获取。

cdm_endpoint

MySQL数据库

IP地址

本地的MySQL数据库的IP地址,且该地址允许CDM通过公网IP访问。

1xx.120.85.24

端口

MySQL数据库的端口。

3306

数据库名称

待导出数据的MySQL数据库名称。

DB_name

用户名

访问MySQL数据库的用户,该用户拥有MySQL数据库的读、写和删除权限。

username

密码

访问MySQL数据库的用户密码。

DB_password

DWS数据库

IP地址

DWS数据库的IP地址,CDM可通过内网访问该地址。

10.120.85.24

端口

DWS数据库的端口。

3306

数据库名称

待写入数据的DWS数据库名称。

DWS

用户名

访问DWS数据库的用户,该用户拥有DWS数据库的读、写和删除权限。

user_dws

密码

访问DWS数据库的用户密码。

dws_password

获取token

  1. 调用其他API前,需要获取token,并设置成环境变量。
    curl -H "Content-Type:application/json" https://{iam_endpoint}/v3/auth/tokens -X POST -d '
    {
        "auth": {
            "identity": {
                "methods": [
                    "password"
                ],
                "password": {
                    "user": {
                        "name": "Username",
                        "password": "password",
                        "domain": {
                            "name": "Account Name"
                        }
                    }
                }
            },
            "scope": {
                "project": {
                    "id": "1551c7f6c808414d8e9f3c514a170f2e"
                }
            }
        }
    }
    ' -v -k

    响应Header中“X-Subject-Token”的值即为Token:

    X-Subject-Token:MIIDkgYJKoZIhvcNAQcCoIIDgzCCA38CAQExDTALBglghkgBZQMEAgEwgXXXXX...
  2. 使用如下命令将token设置为环境变量,方便后续事项。
    export Token = MIIDkgYJKoZIhvcNAQcCoIIDgzCCA38CAQExDTALBglghkgBZQMEAgEwgXXXXX...

创建CDM集群

  1. 调用创建集群API创建集群,假设集群详情如下:
    • 集群名称为“cdm-ab82”
    • 集群规格为“cdm.medium”
    • VPC、子网、安全组与DWS一致,且自动绑定弹性IP。

    如果返回状态码为200,则说明创建命令执行成功。

    curl -X POST -H 'Content-Type:application/json;charset=utf-8' -H "X-Auth-Token:$Token" -d '
    {
      "cluster": {
        "name": "cdm-ab82",
        "vpcId": "6b47302a-bf79-4b20-bf7a-80987408e196", 
        "instances": [{
                "flavorRef": "fb8fe666-6734-4b11-bc6c-43d11db3c745",
                "nics": [{
                    "net-id": "63bdc3cb-a4e7-486f-82ee-d9bf208c8f8c",
                    "securityGroupId": "005af77a-cce5-45ac-99c7-2ea50ea8addf"
                }],
                "availability_zone": "Project Name",
                "type": "cdm"
            }],
        "datastore": {
                "version": "1.8.5",
                "type": "cdm"
            },
        "isScheduleBootOff": false,
        "scheduleBootTime": "null",
        "scheduleOffTime": "null",
        "isAutoOff": false,
        "sys_tags": [{
                "key": "_sys_enterprise_project_id",
                "value": "1ce45885-4033-40d2-bdde-d4dbaceb387d"
            }]
        },
      "autoRemind": false,
      "phoneNum": "null",
      "email": "null"
    }' 
    https://{cdm_endpoint}/v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters -v -k
  2. 调用查询集群列表查询集群信息,获取集群的ID,并设置为全局变量。
    curl -X GET -H 'Content-Type:application/json;charset=utf-8' -H "X-Auth-Token:$Token" https://{cdm_endpoint}/v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters -k -v

    获取响应如下所示。

    {
      "clusters": [{
        "version": "x.x.x",
        "updated": "2018-09-05T08:38:25",
        "name": "cdm-ab82",
        "created": "2018-09-05T08:38:25",
        "id": "bae65496-643e-47ca-84af-948672de7eeb",
        "status": "200",
        "isFrozen": "0",
        "statusDetail": "Normal",
        "actionProgress": {},
        "config_status": "In-Sync"
      }]
    }

    “status”的状态如果为200则表示集群创建成功,集群对应的ID为bae65496-643e-47ca-84af-948672de7eeb

  3. 使用如下命令将集群对应的ID设置为全局变量,方便后续事项。
    export ID = bae65496-643e-47ca-84af-948672de7eeb

创建连接

  1. 调用创建连接API创建MySQL连接,连接名称为mysql_link。这里假设本地MySQL数据库信息如下:
    • IP地址为1xx.120.85.24。
    • 端口为3306
    • 数据库名称为DB_name
    • 登录用户为username
    • 密码为DB_password

    如果返回状态码为200,则说明创建命令执行成功。

    curl -X POST -H "Content-Type:application/json" -H "X-Auth-Token:$Token" -d '{
      "links": [{
        "enabled": true,
        "update-user": null,
        "name": "mysql_link",
        "link-config-values": {
          "configs": [
                        {
                            "name": "linkConfig",
                            "inputs": [
                                {
                                    "name": "linkConfig.databaseType",
                                    "value": "MYSQL"
                                },
                                {
                                    "name": "linkConfig.host",
                                    "value": "1xx.120.85.24"
                                },
                                {
                                    "name": "linkConfig.port",
                                    "value": "3306"
                                },
                                {
                                    "name": "linkConfig.database",
                                    "value": "DB_name"
                                },
                                {
                                    "name": "linkConfig.username",
                                    "value": "username"
                                },
                                {
                                    "name": "linkConfig.password",
                                    "value": "DB_password"
                                },
                                {
                                    "name": "linkConfig.fetchSize",
                                    "value": "100000"
                                },
                                {
                                    "name": "linkConfig.usingNative",
                                    "value": "true"
                                }
                            ]
                        }
                    ]
        },
        "connector-name": "generic-jdbc-connector",
        "creation-date": 1536654788622,
        "update-date": 1536654788622,
        "creation-user": null
      }]
    }' 
    https://{cdm_endpoint}/v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters/bae65496-643e-47ca-84af-948672de7eeb/cdm/link -k -v
  2. 调用创建连接API创建DWS连接,连接名称为dws_link。这里假设DWS数据库信息如下:
    • 数据库的IP地址为10.120.85.24
    • 端口为3306
    • 数据库的名称为DWS
    • 登录用户为user_dws
    • 密码为dws_password
    curl -X POST -H "Content-Type:application/json" -H "X-Auth-Token:$Token" -d '{
      "links": [{
        "enabled": true,
        "update-user": null,
        "name": "dws_link",
        "link-config-values": {
          "configs": [
                        {
                            "name": "linkConfig",
                            "inputs": [
                                {
                                    "name": "linkConfig.databaseType",
                                    "value": "DWS"
                                },
                                {
                                    "name": "linkConfig.host",
                                    "value": "10.120.85.24"
                                },
                                {
                                    "name": "linkConfig.port",
                                    "value": "3306"
                                },
                                {
                                    "name": "linkConfig.database",
                                    "value": "DWS"
                                },
                                {
                                    "name": "linkConfig.username",
                                    "value": "user_dws"
                                },
                                {
                                    "name": "linkConfig.password",
                                    "value": "dws_password"
                                },
                                {
                                    "name": "linkConfig.fetchSize",
                                    "value": "100000"
                                },
                                {
                                    "name": "linkConfig.usingNative",
                                    "value": "true"
                                }
                            ]
                        }
                    ]
        },
        "connector-name": "generic-jdbc-connector",
        "creation-date": 1536654788622,
        "update-date": 1536654788622,
        "creation-user": null
      }]
    }' 
    https://{cdm_endpoint}/v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters/bae65496-643e-47ca-84af-948672de7eeb/cdm/link -k -v

创建迁移作业

  1. 连接创建成功后,调用指定集群创建作业API创建迁移作业,作业样例如下:
    • 作业名称:mysql2dws。
    • 从MySQL导出数据的数据库名称为default,导出的表名称为mysql_tbl,依据id字段将作业分割为多个任务并发执行。
    • 导入DWS的数据库名称为public,表名为cdm_all_type,导入前不清空数据。
    • 当DWS数据库里没有本地MySQL数据库中的表时,CDM自动在DWS端创建该表。
    • DWS端加载的字段列表为id&gid&name
    • 作业抽取数据时,并发执行的Extractor数量为3。

    如果返回状态码为200,则说明创建命令执行成功。

    curl -X POST -H "Content-Type:application/json" -H "X-Cluster-ID:$ID" -H "X-Auth-Token:$Token" -d '{
      "jobs": [{
        "job_type": "NORMAL_JOB",
        "name": "mysql2dws",
        "from-link-name": "mysql_link",
        "from-connector-name": "generic-jdbc-connector",
        "to-link-name": "dws_link",
        "to-connector-name": "generic-jdbc-connector",
        "from-config-values": {
          "configs": [{
            "name": "fromJobConfig",
            "inputs": [{
              "name": "fromJobConfig.schemaName",
              "value": "default"
            },
            {
              "name": "fromJobConfig.tableName",
              "value": "mysql_tbl"
            },
            {
              "name": "fromJobConfig.partitionColumn",
              "value": "id"
            }]
          }]
        },
    "to-config-values": {
                 "configs": [
                     {
                         "inputs": [
                             {
                                 "name": "toJobConfig.schemaName",
                                 "value": "public"
                             },
                             {
                                 "name": "toJobConfig.tablePreparation",
                                 "value": "CREATE_WHEN_NOT_EXIST"
                             },
                             {
                                 "name": "toJobConfig.tableName",
                                 "value": "cdm_all_type"
                             },
                             {
                                 "name": "toJobConfig.columnList",
                                 "value": "id&gid&name"
                             },
                             {
                                 "name": "toJobConfig.shouldClearTable",
                                 "value": "false"
                             }
                         ],
                         "name": "toJobConfig"
                     }
                 ]
             },
        "driver-config-values": {
          "configs": [{
            "name": "throttlingConfig",
            "inputs": [{
              "name": "throttlingConfig.numExtractors",
              "value": "3"
            }]
          }]
        }
      }]
    }' https://{cdm_endpoint}/v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters/bae65496-643e-47ca-84af-948672de7eeb/cdm/job -k -v
  2. 调用启动作业API开始执行作业。
    curl -X GET -H 'Content-Type:application/json;charset=utf-8' -H "X-Cluster-ID:$ID" -H "X-Auth-Token:$Token" https://{cdm_endpoint}/v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters/bae65496-643e-47ca-84af-948672de7eeb/cdm/job/mysql2dws/start -k -v

    响应如下:

    {
      "submissions": [{
        "progress": 1,
        "job-name": "mysql2dws",
        "status": "BOOTING",
        "creation-date": 1536654788622,
        "creation-user": "cdm"
      }]
    }

查看作业结果

  1. 调用查询作业状态API查询作业状态。
    curl -X GET -H 'Content-Type:application/json;charset=utf-8' -H "X-Cluster-ID:$ID" -H "X-Auth-Token:$Token" https://{cdm_endpoint}/v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters/6ec9a0a4-76be-4262-8697-e7af1fac7920/cdm/job/mysql2dws/status -k -v
  2. 查看作业执行结果,作业执行成功的响应如下:
    {
      "submissions": [{
        "progress": 0,
        "job-name": "mysql2dws",
        "status": "SUCCEEDED",
        "creation-date": 1536654788622,
        "creation-user": "cdm",
        "isStopingIncrement": "",
        "last-update-date": 1536654888622,
        "is-execute-auto": false,
        "last-udpate-user": "cdm",
        "isDeleteJob": false,
        "isIncrementing": false,
        "external-id": "job_local1127970451_0009",
        "counters": {
          "org.apache.sqoop.submission.counter.SqoopCounters": {
            "BYTES_WRITTEN": -1,
            "TOTAL_FILES": -1,
            "BYTES_READ": -1,
            "FILES_WRITTEN": -1,
            "TOTAL_SIZE": -1,
            "FILES_READ": -1,
            "ROWS_WRITTEN": 80,
            "ROWS_READ": 80
          }
        }
      }]
    }
    • BYTES_WRITTEN:表示写入的字节数。
    • BYTES_READ:表示读取的字节数。
    • TOTAL_FILES:表示总文件数。
    • FILES_WRITTEN:表示写入的文件数。
    • FILES_READ:表示读取的文件数。
    • ROWS_WRITTEN:表示写入成功的行数。
    • ROWS_READ:表示读取成功的行数。