文档首页> 云数据迁移 CDM> 最佳实践> 数据迁移进阶实践> 增量迁移> 通过数据开发实现数据增量迁移
更新时间:2021-11-19 GMT+08:00
分享

通过数据开发实现数据增量迁移

DGC服务的DLF组件提供了一站式的大数据协同开发平台,借助DLF的在线脚本编辑、周期调度CDM的迁移作业,也可以实现增量数据迁移。

这里以DWS导入到OBS为例,介绍DLF配合CDM实现增量迁移的流程:
  1. 获取CDM作业的JSON
  2. 修改JSON
  3. 创建DLF作业

获取CDM作业的JSON

  1. 进入CDM主界面,创建一个DWS到OBS的表/文件迁移作业。
  2. 在CDM“作业管理”界面的“表/文件迁移”页签下,找到已创建的作业,单击作业操作列的更多 > 查看作业JSON,如图1所示。

    您也可以使用其它已创建好的CDM作业JSON。

    图1 查看作业JSON
  3. 作业JSON就是创建CDM作业的请求消息体模板,URL地址中[Endpoint]、{project_id}、{cluster_id}需要替换为您实际的信息:
    • [Endpoint]:终端节点。

      终端节点(Endpoint)即调用API的请求地址,不同服务不同区域的终端节点不同。Endpoint可从终端节点及区域说明获取。

    • {project_id}:项目ID。
    • {cluster_id}:集群ID,可在CDM集管理界面,单击集群名称查看。

修改JSON

根据您的业务需要,可以修改JSON Body。这里以1天为周期,where子句作为抽取数据时的判断条件(一般使用时间字段来作为增量迁移时的判断条件),每天迁移昨天新增的数据。
  1. 修改where子句,增量某个时间段的数据:
         {
            "name": "fromJobConfig.whereClause",
            "value": "_timestamp >= '${startTime}' and _timestamp < '${currentTime}'"
         }
    • 源端数据库是数据仓库服务DWS或者MySQL时,对于时间的判断可以写成以下两种:
      _timestamp >= '2018-10-10 00:00:00' and _timestamp < '2018-10-11 00:00:00'
      或者
      _timestamp between '2018-10-10 00:00:00' and '2018-10-11 00:00:00'
    • 如果源端数据库是Oracle,where子句应该写成:
      _timestamp >= to_date (2018-10-10 00:00:00' , 'yyyy-mm-dd hh24:mi:ss' ) and _timestamp < to_date (2018-10-10 00:00:00' , 'yyyy-mm-dd hh24:mi:ss' )
  2. 每个周期的增量数据导入到不同的目录:
         {
            "name": "toJobConfig.outputDirectory",
            "value": "dws2obs/${currentTime}"
         }
  3. 作业名改成动态的,否则会因为作业重名而无法创建:
        "to-connector-name": "obs-connector",
        "from-link-name": "dws_link",
        "name": "dws2obs-${currentTime}"
如果需要修改更多参数,请参见《云数据迁移API参考》,这里修改后的JSON样例如下:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
{
  "jobs": [
    {
      "job_type": "NORMAL_JOB",
      "to-config-values": {
        "configs": [
          {
            "inputs": [
              {
                "name": "toJobConfig.bucketName",
                "value": "cdm-test"
              },
              {
                "name": "toJobConfig.outputDirectory",
                "value": "dws2obs/${currentTime}"
              },
              {
                "name": "toJobConfig.outputFormat",
                "value": "CSV_FILE"
              },
              {
                "name": "toJobConfig.fieldSeparator",
                "value": ","
              },
              {
                "name": "toJobConfig.writeToTempFile",
                "value": "false"
              },
              {
                "name": "toJobConfig.validateMD5",
                "value": "false"
              },
              {
                "name": "toJobConfig.encodeType",
                "value": "UTF-8"
              },
              {
                "name": "toJobConfig.duplicateFileOpType",
                "value": "REPLACE"
              },
              {
                "name": "toJobConfig.kmsEncryption",
                "value": "false"
              }
            ],
            "name": "toJobConfig"
          }
        ]
      },
      "from-config-values": {
        "configs": [
          {
            "inputs": [
              {
                "name": "fromJobConfig.schemaName",
                "value": "dws_database"
              },
              {
                "name": "fromJobConfig.tableName",
                "value": "dws_from"
              },
              {
                "name": "fromJobConfig.whereClause",
                "value": "_timestamp >= '${startTime}' and _timestamp < '${currentTime}'"
              },
              {
                "name": "fromJobConfig.columnList",
                "value": "_tiny&_small&_int&_integer&_bigint&_float&_double&_date&_timestamp&_char&_varchar&_text"
              }
            ],
            "name": "fromJobConfig"
          }
        ]
      },
      "from-connector-name": "generic-jdbc-connector",
      "to-link-name": "obs_link",
      "driver-config-values": {
        "configs": [
          {
            "inputs": [
              {
                "name": "throttlingConfig.numExtractors",
                "value": "1"
              },
              {
                "name": "throttlingConfig.submitToCluster",
                "value": "false"
              },
              {
                "name": "throttlingConfig.numLoaders",
                "value": "1"
              },
              {
                "name": "throttlingConfig.recordDirtyData",
                "value": "false"
              },
              {
                "name": "throttlingConfig.writeToLink",
                "value": "obs_link"
              }
            ],
            "name": "throttlingConfig"
          },
          {
            "inputs": [],
            "name": "jarConfig"
          },
          {
            "inputs": [],
            "name": "schedulerConfig"
          },
          {
            "inputs": [],
            "name": "transformConfig"
          },
          {
            "inputs": [],
            "name": "smnConfig"
          },
          {
            "inputs": [],
            "name": "retryJobConfig"
          }
        ]
      },
      "to-connector-name": "obs-connector",
      "from-link-name": "dws_link",
      "name": "dws2obs-${currentTime}"
    }
  ]
}

创建DLF作业

  1. 在DLF创建如图2所示的数据开发作业,详细操作请参见《数据湖治理中心DGC 用户指南》的新建作业
    各节点与作业的配置详情请参见后续步骤。
    图2 DLF作业
  2. 配置“创建作业”节点。
    DLF通过RestAPI算子调用REST接口创建CDM迁移作业。配置RestAPI算子的属性如下 :
    1. 节点名称:您自定义名称,例如“创建CDM作业”。注意区分:在DLF作业中,CDM的迁移作业只是作为节点运行。
    2. URL地址:配置为获取CDM作业的JSON中获取的URL,格式为https://{Endpoint}/cdm/v1.0/{project_id}/clusters/{cluster_id}/cdm/job。例如“https://cdm.cn-north-1.myhuaweicloud.com/cdm/v1.0/1551c7f6c808414d8e9f3c514a170f2e/clusters/6ec9a0a4-76be-4262-8697-e7af1fac7920/cdm/job”
    3. HTTP方法:创建CDM作业的HTTP请求方法为“POST”
    4. 添加如下两个请求头:
      • Content-Type = application/json
      • X-Language = en-us
    5. 请求消息体:输入修改JSON里面修改完成后的CDM作业JSON。
    图3 创建CDM作业的算子属性
  3. 配置“运行作业”节点。
    创建CDM作业的算子配置完后,还需要在后面添加运行CDM作业的REST算子,具体请参见《云数据迁移API参考》中的“启动作业”章节。配置RestAPI算子的属性如下 :
    1. 节点名称:运行作业。
    2. URL地址:其中project_id、cluster_id和2. 配置“创建作业”节点中的保持一致,作业名需要配置为“dws2obs-${currentTime}”。格式为https://{Endpoint}/cdm/v1.0/{project_id}/clusters/{cluster_id}/cdm/job/{job_name}/start。

      例如“https://cdm.cn-north-1.myhuaweicloud.com/cdm/v1.0/1551c7f6c808414d8e9f3c514a170f2e/clusters/6ec9a0a4-76be-4262-8697-e7af1fac7920/cdm/job/dws2obs-${currentTime}/start”

    3. HTTP方法:运行CDM作业的HTTP请求方法为“PUT”
    4. 请求头:
      • Content-Type = application/json
      • X-Language = en-us
    图4 运行CDM作业的算子属性
  4. 配置“等待作业运行完成”节点。

    由于CDM作业是异步运行的,运行作业的REST请求返回200,不代表数据已经迁移成功。后续有计算作业依赖CDM的迁移作业时,需要一个RestAPI算子去周期判断迁移是否成功,如果CDM迁移成功,再去做计算操作。查询CDM迁移是否成功的API,具体请参见《云数据迁移API参考》中“查询作业状态”章节。

    运行CDM作业的REST算子配置完成后,添加等待CDM作业完成算子,算子属性为:
    1. 节点名称:等待作业运行完成。
    2. URL地址:格式为https://{Endpoint}/cdm/v1.0/{project_id}/clusters/{cluster_id}/cdm/job/{job_name}/status。其中project_id、cluster_id和2. 配置“创建作业”节点中的保持一致,作业名需要配置为“dws2obs-${currentTime}”。例如“https://cdm.cn-north-1.myhuaweicloud.com/cdm/v1.0/1551c7f6c808414d8e9f3c514a170f2e/clusters/6ec9a0a4-76be-4262-8697-e7af1fac7920/cdm/job/dws2obs-${currentTime}/status”
    3. HTTP方法:查询CDM作业状态的HTTP请求方法为“GET”
    4. 请求头:
      • Content-Type = application/json
      • X-Language = en-us
    5. 是否需要判断返回值:选择“YES”
    6. 返回值字段路径:配置为submissions[0].status。
    7. 请求成功标志位:配置为SUCCEEDED。
    8. 其他参数保持默认即可。
  5. (可选)配置“删除作业运行完成”节点。

    这里的删除作业可根据实际需要选择。由于DLF是通过周期创建CDM作业来实现增量迁移,因此会累积大量的作业在CDM集群上,所以可在迁移成功后,删除已经运行成功的作业。如果您需要删除,在查询CDM作业状态的算子后面,添加删除CDM作业的RestAPI算子即可,DLF会调用《云数据迁移API参考》中的“删除作业”接口。

    删除CDM作业的算子属性为:
    1. 节点名称:删除作业。
    2. URL地址:格式为https://{Endpoint}/cdm/v1.0/{project_id}/clusters/{cluster_id}/cdm/job/{job_name}。其中project_id、cluster_id和2. 配置“创建作业”节点中的保持一致,作业名需要配置为“dws2obs-${currentTime}”。例如“https://cdm.cn-north-1.myhuaweicloud.com/cdm/v1.0/1551c7f6c808414d8e9f3c514a170f2e/clusters/6ec9a0a4-76be-4262-8697-e7af1fac7920/cdm/job/dws2obs-${currentTime}”
    3. HTTP方法:删除CDM作业的HTTP请求方法为“DELETE”
    4. 请求头:
      • Content-Type = application/json
      • X-Language = en-us
    5. 其他参数保持默认即可。
    图5 删除CDM作业算子配置
  6. 如果需要在迁移完后进行计算操作,可在后续添加各种计算算子,完成数据计算。
  7. 配置DLF作业参数。
    1. 配置DLF作业参数,如图6所示。
      • startTime = $getTaskPlanTime(plantime,@@yyyyMMddHHmmss@@,-24*60*60)
      • currentTime = $getTaskPlanTime(plantime,@@yyyyMMdd-HHmm@@,0)
      图6 DLF作业参数配置
    2. 保存DLF作业后,选择调度配置 > 周期调度,调度周期配置为1天。

      这样,DLF配合CDM就实现了每天迁移昨天新增的数据。

分享:

    相关文档

    相关产品

close