更新时间:2024-11-14 GMT+08:00
分享

指定集群创建作业

功能介绍

指定集群创建作业接口。

调用方法

请参见如何调用API

URI

POST /v1.1/{project_id}/clusters/{cluster_id}/cdm/job

表1 路径参数

参数

是否必选

参数类型

描述

project_id

String

项目ID,获取方法请参见项目ID和账号ID

cluster_id

String

集群ID。

请求参数

表2 请求Header参数

参数

是否必选

参数类型

描述

X-Auth-Token

String

用户Token。

通过调用IAM服务获取用户Token接口获取(响应消息头中X-Subject-Token的值)。

表3 请求Body参数

参数

是否必选

参数类型

描述

jobs

Array of Job objects

作业列表,请参见jobs数据结构说明。

表4 Job

参数

是否必选

参数类型

描述

job_type

String

作业类型:

  • NORMAL_JOB:表/文件迁移。

  • BATCH_JOB:整库迁移。

  • SCENARIO_JOB:场景迁移。

from-connector-name

String

源端连接类型,对应的连接参数如下:

  • generic-jdbc-connector:关系数据库连接。

  • obs-connector:OBS连接。

  • hdfs-connector:HDFS连接。

  • hbase-connector:HBase连接、CloudTable连接。

  • hive-connector:Hive连接。

  • ftp-connector/sftp-connector:FTP/SFTP连接。

  • mongodb-connector:MongoDB连接。

  • redis-connector:Redis/DCS连接。

  • kafka-connector:Kafka连接。

  • dis-connector:DIS连接。

  • elasticsearch-connector:Elasticsearch/云搜索服务连接。

  • dli-connector:DLI连接。

  • http-connector:HTTP/HTTPS连接,该连接暂无连接参数。

  • dms-kafka-connector:DMSKafka连接。

to-config-values

ConfigValues object

目的连接参数配置。根据不同目的端有不同的参数配置,具体可参考目的端作业参数说明下相应的目的端参数配置。

to-link-name

String

目的端连接名称,即为通过“创建连接”接口创建的连接对应的连接名。

driver-config-values

ConfigValues object

作业任务参数配置。例如配置作业失败重试、抽取并发数,具体可参考作业任务参数说明

from-config-values

ConfigValues object

源连接参数配置。根据不同源端有不同的参数配置,具体可参考源端作业参数说明下相应的源端参数配置。

to-connector-name

String

目的端连接类型,对应的连接参数如下:

  • generic-jdbc-connector:关系数据库连接。

  • obs-connector:OBS连接。

  • hdfs-connector:HDFS连接。

  • hbase-connector:HBase连接、CloudTable连接。

  • hive-connector:Hive连接。

  • ftp-connector/sftp-connector:FTP/SFTP连接。

  • mongodb-connector:MongoDB连接。

  • redis-connector:Redis/DCS连接。

  • kafka-connector:Kafka连接。

  • dis-connector:DIS连接。

  • elasticsearch-connector:Elasticsearch/云搜索服务连接。

  • dli-connector:DLI连接。

  • http-connector:HTTP/HTTPS连接,该连接暂无连接参数。

  • dms-kafka-connector:DMSKafka连接。

name

String

作业名称,长度在1到240个字符之间。

from-link-name

String

源连接名称,即为通过“创建连接”接口创建的连接对应的连接名。

creation-user

String

创建作业的用户。由系统生成,用户无需填写。

creation-date

Long

作业创建的时间,单位:毫秒。由系统生成,用户无需填写。

update-date

Long

作业最后更新的时间,单位:毫秒。由系统生成,用户无需填写。

is_incre_job

Boolean

是否是增量作业。已废弃。

flag

Integer

是否是定时作业标记,如果是定时作业则为1,否则为0。由系统根据定时任务配置生成,用户无需填写。

files_read

Integer

已读文件数。由系统生成,用户无需填写。

update-user

String

最后更新作业的用户。由系统生成,用户无需填写。

external_id

String

具体执行的作业id,如果是本地作业,则一般为"job_local1202051771_0002"形式,如果是DLI作业,则为DLI作业ID,比如"12345"。由系统生成,用户无需填写。

type

String

与job_type一致,作业类型:

  • NORMAL_JOB:表/文件迁移。

  • BATCH_JOB:整库迁移。

  • SCENARIO_JOB:场景迁移。

execute_start_date

Long

最近一次执行任务开始时间,单位:毫秒。由系统生成,用户无需填写。

delete_rows

Integer

增量作业删除行数,已废弃。

enabled

Boolean

是否激活连接。由系统生成,用户无需填写。

bytes_written

Long

作业写入的字节。由系统生成,用户无需填写。

id

Integer

作业ID。由系统生成,用户无需填写。

is_use_sql

Boolean

用户是否使用sql。由系统根据源端抽取是否使用sql语句生成,用户无需填写。

update_rows

Integer

增量作业更新行数,已废弃。

group_name

String

组名。

bytes_read

Long

作业读取的字节。由系统生成,用户无需填写。

execute_update_date

Long

最近一次执行任务更新时间,单位:毫秒。由系统生成,用户无需填写。

write_rows

Integer

增量作业写入行数,已废弃。

rows_written

Integer

作业写入的行数。由系统生成,用户无需填写。

rows_read

Long

作业读取的行数。由系统生成,用户无需填写。

files_written

Integer

写入文件数。由系统生成,用户无需填写。

is_incrementing

Boolean

是否是增量作业,同is_incre_job,已废弃。

execute_create_date

Long

最近一次执行任务创建时间,单位:毫秒。由系统生成,用户无需填写。

status

String

作业最后的执行状态:

  • BOOTING:启动中。

  • RUNNING:运行中。

  • SUCCEEDED:成功。

  • FAILED:失败。

  • NEW:未被执行。

表5 ConfigValues

参数

是否必选

参数类型

描述

configs

Array of configs objects

源连接参数、目的连接参数和作业任务参数,它们的配置数据结构相同,其中“inputs”里的参数不一样,详细请参见configs数据结构说明。

extended-configs

extended-configs object

扩展配置,请参见extended-configs参数说明。扩展配置暂不对外开放,用户无需填写。

表6 configs

参数

是否必选

参数类型

描述

inputs

Array of Input objects

输入参数列表,列表中的每个参数为“name,value”结构,请参考inputs数据结构参数说明。在“from-config-values”数据结构中,不同的源连接类型有不同的“inputs”参数列表,请参见源端作业参数说明下的章节。在“to-config-values”数据结构中,不同的目的连接类型有不同的“inputs”参数列表,请参见目的端作业参数说明下面的子章节。在“driver-config-values”数据结构中,“inputs”具体参数请参见作业任务参数说明。

name

String

配置名称:源端作业的配置名称为“fromJobConfig”。目的端作业的配置名称为“toJobConfig”,连接的配置名称固定为“linkConfig”。

id

Integer

配置ID,由系统生成,用户无需填写。

type

String

配置类型,由系统生成,用户无需填写。值为LINK或者JOB,如果是连接管理API,则为LINK;如果是作业管理API,则为JOB。

表7 Input

参数

是否必选

参数类型

描述

name

String

参数名:

  • 如果是连接管理API,则以“linkConfig.”开头,对于不同连接类型有不同的参数,具体可参见连接参数说明下相应连接的参数说明。

  • 如果是作业管理API,对于源端连接参数,则以“fromJobConfig.”开头,具体可参见源端作业参数说明下相应的源端参数说明;对于目的端连接参数,则以“toJobConfig.”开头,具体可参见目的端作业参数说明下相应的目的端参数说明;对于作业任务参数,请参见作业任务参数说明下相应的任务参数说明。

value

Object

参数值,参数名对应的值,必须填写为字符串。

type

String

值类型,如STRING、INTEGER,由系统设定,用户无需填写。

表8 extended-configs

参数

是否必选

参数类型

描述

name

String

扩展配置名称,暂不对外开放,用户无需填写。

value

String

扩展配置值,暂不对外开放,用户无需填写。

响应参数

状态码: 200

表9 响应Body参数

参数

参数类型

描述

name

String

作业名称。

validation-result

Array of JobValidationResult objects

校验结果:

  • 如果修改失败,返回失败原因。

  • 如果修改成功,返回空列表。

表10 JobValidationResult

参数

参数类型

描述

message

String

错误描述。

status

String

错误级别,如:ERROR、WARNING。

状态码: 400

表11 响应Body参数

参数

参数类型

描述

code

String

返回编码。

errCode

String

错误码。

message

String

报错信息。

externalMessage

String

附加信息。

请求示例

创建一个源端为Elasticsearch数据连接,目的端为DIS数据连接,作业名为es_css的数据迁移作业。

POST /v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters/6ec9a0a4-76be-4262-8697-e7af1fac7920/cdm/job

{
  "jobs" : [ {
    "job_type" : "NORMAL_JOB",
    "from-connector-name" : "elasticsearch-connector",
    "to-config-values" : {
      "configs" : [ {
        "inputs" : [ {
          "name" : "toJobConfig.streamName",
          "value" : "dis-lkGm"
        }, {
          "name" : "toJobConfig.separator",
          "value" : "|"
        }, {
          "name" : "toJobConfig.columnList",
          "value" : "1&2&3"
        } ],
        "name" : "toJobConfig"
      } ]
    },
    "to-link-name" : "dis",
    "driver-config-values" : {
      "configs" : [ {
        "inputs" : [ {
          "name" : "throttlingConfig.numExtractors",
          "value" : "1"
        }, {
          "name" : "throttlingConfig.submitToCluster",
          "value" : "false"
        }, {
          "name" : "throttlingConfig.numLoaders",
          "value" : "1"
        }, {
          "name" : "throttlingConfig.recordDirtyData",
          "value" : "false"
        } ],
        "name" : "throttlingConfig"
      }, {
        "inputs" : [ ],
        "name" : "jarConfig"
      }, {
        "inputs" : [ {
          "name" : "schedulerConfig.isSchedulerJob",
          "value" : "false"
        }, {
          "name" : "schedulerConfig.disposableType",
          "value" : "NONE"
        } ],
        "name" : "schedulerConfig"
      }, {
        "inputs" : [ ],
        "name" : "transformConfig"
      }, {
        "inputs" : [ {
          "name" : "retryJobConfig.retryJobType",
          "value" : "NONE"
        } ],
        "name" : "retryJobConfig"
      } ]
    },
    "from-config-values" : {
      "configs" : [ {
        "inputs" : [ {
          "name" : "fromJobConfig.index",
          "value" : "52est"
        }, {
          "name" : "fromJobConfig.type",
          "value" : "est_array"
        }, {
          "name" : "fromJobConfig.columnList",
          "value" : "array_f1_int:long&array_f2_text:string&array_f3_object:nested"
        }, {
          "name" : "fromJobConfig.splitNestedField",
          "value" : "false"
        } ],
        "name" : "fromJobConfig"
      } ]
    },
    "to-connector-name" : "dis-connector",
    "name" : "es_css",
    "from-link-name" : "css"
  } ]
}

响应示例

状态码: 200

OK。

{
  "name" : "mysql2hive"
}

状态码: 400

请求报错。

{
  "code" : "Cdm.0104",
  "errCode" : "Cdm.0104",
  "message" : "Job name already exist or created by other.",
  "ternalMessage" : "Job name already exist or created by other."
}

SDK代码示例

SDK代码示例如下。

创建一个源端为Elasticsearch数据连接,目的端为DIS数据连接,作业名为es_css的数据迁移作业。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package com.huaweicloud.sdk.test;

import com.huaweicloud.sdk.core.auth.ICredential;
import com.huaweicloud.sdk.core.auth.BasicCredentials;
import com.huaweicloud.sdk.core.exception.ConnectionException;
import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
import com.huaweicloud.sdk.core.exception.ServiceResponseException;
import com.huaweicloud.sdk.cdm.v1.region.CdmRegion;
import com.huaweicloud.sdk.cdm.v1.*;
import com.huaweicloud.sdk.cdm.v1.model.*;

import java.util.List;
import java.util.ArrayList;

public class CreateJobSolution {

    public static void main(String[] args) {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        String ak = System.getenv("CLOUD_SDK_AK");
        String sk = System.getenv("CLOUD_SDK_SK");
        String projectId = "{project_id}";

        ICredential auth = new BasicCredentials()
                .withProjectId(projectId)
                .withAk(ak)
                .withSk(sk);

        CdmClient client = CdmClient.newBuilder()
                .withCredential(auth)
                .withRegion(CdmRegion.valueOf("<YOUR REGION>"))
                .build();
        CreateJobRequest request = new CreateJobRequest();
        request.withClusterId("{cluster_id}");
        CdmCreateJobJsonReq body = new CdmCreateJobJsonReq();
        List<Input> listConfigsInputs = new ArrayList<>();
        listConfigsInputs.add(
            new Input()
                .withName("fromJobConfig.index")
                .withValue("52est")
        );
        listConfigsInputs.add(
            new Input()
                .withName("fromJobConfig.type")
                .withValue("est_array")
        );
        listConfigsInputs.add(
            new Input()
                .withName("fromJobConfig.columnList")
                .withValue("array_f1_int:long&array_f2_text:string&array_f3_object:nested")
        );
        listConfigsInputs.add(
            new Input()
                .withName("fromJobConfig.splitNestedField")
                .withValue("false")
        );
        List<Configs> listFromConfigValuesConfigs = new ArrayList<>();
        listFromConfigValuesConfigs.add(
            new Configs()
                .withInputs(listConfigsInputs)
                .withName("fromJobConfig")
        );
        ConfigValues fromconfigvaluesJobs = new ConfigValues();
        fromconfigvaluesJobs.withConfigs(listFromConfigValuesConfigs);
        List<Input> listConfigsInputs1 = new ArrayList<>();
        listConfigsInputs1.add(
            new Input()
                .withName("retryJobConfig.retryJobType")
                .withValue("NONE")
        );
        List<Input> listConfigsInputs2 = new ArrayList<>();
        listConfigsInputs2.add(
            new Input()
                .withName("schedulerConfig.isSchedulerJob")
                .withValue("false")
        );
        listConfigsInputs2.add(
            new Input()
                .withName("schedulerConfig.disposableType")
                .withValue("NONE")
        );
        List<Input> listConfigsInputs3 = new ArrayList<>();
        listConfigsInputs3.add(
            new Input()
                .withName("throttlingConfig.numExtractors")
                .withValue("1")
        );
        listConfigsInputs3.add(
            new Input()
                .withName("throttlingConfig.submitToCluster")
                .withValue("false")
        );
        listConfigsInputs3.add(
            new Input()
                .withName("throttlingConfig.numLoaders")
                .withValue("1")
        );
        listConfigsInputs3.add(
            new Input()
                .withName("throttlingConfig.recordDirtyData")
                .withValue("false")
        );
        List<Configs> listDriverConfigValuesConfigs = new ArrayList<>();
        listDriverConfigValuesConfigs.add(
            new Configs()
                .withInputs(listConfigsInputs1)
                .withName("retryJobConfig")
        );
        ConfigValues driverconfigvaluesJobs = new ConfigValues();
        driverconfigvaluesJobs.withConfigs(listDriverConfigValuesConfigs);
        List<Input> listConfigsInputs4 = new ArrayList<>();
        listConfigsInputs4.add(
            new Input()
                .withName("toJobConfig.streamName")
                .withValue("dis-lkGm")
        );
        listConfigsInputs4.add(
            new Input()
                .withName("toJobConfig.separator")
                .withValue("|")
        );
        listConfigsInputs4.add(
            new Input()
                .withName("toJobConfig.columnList")
                .withValue("1&2&3")
        );
        List<Configs> listToConfigValuesConfigs = new ArrayList<>();
        listToConfigValuesConfigs.add(
            new Configs()
                .withInputs(listConfigsInputs4)
                .withName("toJobConfig")
        );
        ConfigValues toconfigvaluesJobs = new ConfigValues();
        toconfigvaluesJobs.withConfigs(listToConfigValuesConfigs);
        List<Job> listbodyJobs = new ArrayList<>();
        listbodyJobs.add(
            new Job()
                .withJobType(Job.JobTypeEnum.fromValue("NORMAL_JOB"))
                .withFromConnectorName("elasticsearch-connector")
                .withToConfigValues(toconfigvaluesJobs)
                .withToLinkName("dis")
                .withDriverConfigValues(driverconfigvaluesJobs)
                .withFromConfigValues(fromconfigvaluesJobs)
                .withToConnectorName("dis-connector")
                .withName("es_css")
                .withFromLinkName("css")
        );
        body.withJobs(listbodyJobs);
        request.withBody(body);
        try {
            CreateJobResponse response = client.createJob(request);
            System.out.println(response.toString());
        } catch (ConnectionException e) {
            e.printStackTrace();
        } catch (RequestTimeoutException e) {
            e.printStackTrace();
        } catch (ServiceResponseException e) {
            e.printStackTrace();
            System.out.println(e.getHttpStatusCode());
            System.out.println(e.getRequestId());
            System.out.println(e.getErrorCode());
            System.out.println(e.getErrorMsg());
        }
    }
}

创建一个源端为Elasticsearch数据连接,目的端为DIS数据连接,作业名为es_css的数据迁移作业。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# coding: utf-8

import os
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkcdm.v1.region.cdm_region import CdmRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkcdm.v1 import *

if __name__ == "__main__":
    # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak = os.environ["CLOUD_SDK_AK"]
    sk = os.environ["CLOUD_SDK_SK"]
    projectId = "{project_id}"

    credentials = BasicCredentials(ak, sk, projectId)

    client = CdmClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(CdmRegion.value_of("<YOUR REGION>")) \
        .build()

    try:
        request = CreateJobRequest()
        request.cluster_id = "{cluster_id}"
        listInputsConfigs = [
            Input(
                name="fromJobConfig.index",
                value="52est"
            ),
            Input(
                name="fromJobConfig.type",
                value="est_array"
            ),
            Input(
                name="fromJobConfig.columnList",
                value="array_f1_int:long&array_f2_text:string&array_f3_object:nested"
            ),
            Input(
                name="fromJobConfig.splitNestedField",
                value="false"
            )
        ]
        listConfigsFromconfigvalues = [
            Configs(
                inputs=listInputsConfigs,
                name="fromJobConfig"
            )
        ]
        fromconfigvaluesJobs = ConfigValues(
            configs=listConfigsFromconfigvalues
        )
        listInputsConfigs1 = [
            Input(
                name="retryJobConfig.retryJobType",
                value="NONE"
            )
        ]
        listInputsConfigs2 = [
            Input(
                name="schedulerConfig.isSchedulerJob",
                value="false"
            ),
            Input(
                name="schedulerConfig.disposableType",
                value="NONE"
            )
        ]
        listInputsConfigs3 = [
            Input(
                name="throttlingConfig.numExtractors",
                value="1"
            ),
            Input(
                name="throttlingConfig.submitToCluster",
                value="false"
            ),
            Input(
                name="throttlingConfig.numLoaders",
                value="1"
            ),
            Input(
                name="throttlingConfig.recordDirtyData",
                value="false"
            )
        ]
        listConfigsDriverconfigvalues = [
            Configs(
                inputs=listInputsConfigs1,
                name="retryJobConfig"
            )
        ]
        driverconfigvaluesJobs = ConfigValues(
            configs=listConfigsDriverconfigvalues
        )
        listInputsConfigs4 = [
            Input(
                name="toJobConfig.streamName",
                value="dis-lkGm"
            ),
            Input(
                name="toJobConfig.separator",
                value="|"
            ),
            Input(
                name="toJobConfig.columnList",
                value="1&2&3"
            )
        ]
        listConfigsToconfigvalues = [
            Configs(
                inputs=listInputsConfigs4,
                name="toJobConfig"
            )
        ]
        toconfigvaluesJobs = ConfigValues(
            configs=listConfigsToconfigvalues
        )
        listJobsbody = [
            Job(
                job_type="NORMAL_JOB",
                from_connector_name="elasticsearch-connector",
                to_config_values=toconfigvaluesJobs,
                to_link_name="dis",
                driver_config_values=driverconfigvaluesJobs,
                from_config_values=fromconfigvaluesJobs,
                to_connector_name="dis-connector",
                name="es_css",
                from_link_name="css"
            )
        ]
        request.body = CdmCreateJobJsonReq(
            jobs=listJobsbody
        )
        response = client.create_job(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

创建一个源端为Elasticsearch数据连接,目的端为DIS数据连接,作业名为es_css的数据迁移作业。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package main

import (
	"fmt"
	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
    cdm "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/cdm/v1"
	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/cdm/v1/model"
    region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/cdm/v1/region"
)

func main() {
    // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak := os.Getenv("CLOUD_SDK_AK")
    sk := os.Getenv("CLOUD_SDK_SK")
    projectId := "{project_id}"

    auth := basic.NewCredentialsBuilder().
        WithAk(ak).
        WithSk(sk).
        WithProjectId(projectId).
        Build()

    client := cdm.NewCdmClient(
        cdm.CdmClientBuilder().
            WithRegion(region.ValueOf("<YOUR REGION>")).
            WithCredential(auth).
            Build())

    request := &model.CreateJobRequest{}
	request.ClusterId = "{cluster_id}"
	valueInputs:= "52est"
	var valueInputsInterface interface{} = valueInputs
	valueInputs1:= "est_array"
	var valueInputsInterface1 interface{} = valueInputs1
	valueInputs2:= "array_f1_int:long&array_f2_text:string&array_f3_object:nested"
	var valueInputsInterface2 interface{} = valueInputs2
	valueInputs3:= "false"
	var valueInputsInterface3 interface{} = valueInputs3
	var listInputsConfigs = []model.Input{
        {
            Name: "fromJobConfig.index",
            Value: &valueInputsInterface,
        },
        {
            Name: "fromJobConfig.type",
            Value: &valueInputsInterface1,
        },
        {
            Name: "fromJobConfig.columnList",
            Value: &valueInputsInterface2,
        },
        {
            Name: "fromJobConfig.splitNestedField",
            Value: &valueInputsInterface3,
        },
    }
	var listConfigsFromConfigValues = []model.Configs{
        {
            Inputs: listInputsConfigs,
            Name: "fromJobConfig",
        },
    }
	fromconfigvaluesJobs := &model.ConfigValues{
		Configs: listConfigsFromConfigValues,
	}
	valueInputs4:= "NONE"
	var valueInputsInterface4 interface{} = valueInputs4
	var listInputsConfigs1 = []model.Input{
        {
            Name: "retryJobConfig.retryJobType",
            Value: &valueInputsInterface4,
        },
    }
	valueInputs5:= "false"
	var valueInputsInterface5 interface{} = valueInputs5
	valueInputs6:= "NONE"
	var valueInputsInterface6 interface{} = valueInputs6
	var listInputsConfigs2 = []model.Input{
        {
            Name: "schedulerConfig.isSchedulerJob",
            Value: &valueInputsInterface5,
        },
        {
            Name: "schedulerConfig.disposableType",
            Value: &valueInputsInterface6,
        },
    }
	valueInputs7:= "1"
	var valueInputsInterface7 interface{} = valueInputs7
	valueInputs8:= "false"
	var valueInputsInterface8 interface{} = valueInputs8
	valueInputs9:= "1"
	var valueInputsInterface9 interface{} = valueInputs9
	valueInputs10:= "false"
	var valueInputsInterface10 interface{} = valueInputs10
	var listInputsConfigs3 = []model.Input{
        {
            Name: "throttlingConfig.numExtractors",
            Value: &valueInputsInterface7,
        },
        {
            Name: "throttlingConfig.submitToCluster",
            Value: &valueInputsInterface8,
        },
        {
            Name: "throttlingConfig.numLoaders",
            Value: &valueInputsInterface9,
        },
        {
            Name: "throttlingConfig.recordDirtyData",
            Value: &valueInputsInterface10,
        },
    }
	var listConfigsDriverConfigValues = []model.Configs{
        {
            Inputs: listInputsConfigs1,
            Name: "retryJobConfig",
        },
    }
	driverconfigvaluesJobs := &model.ConfigValues{
		Configs: listConfigsDriverConfigValues,
	}
	valueInputs11:= "dis-lkGm"
	var valueInputsInterface11 interface{} = valueInputs11
	valueInputs12:= "|"
	var valueInputsInterface12 interface{} = valueInputs12
	valueInputs13:= "1&2&3"
	var valueInputsInterface13 interface{} = valueInputs13
	var listInputsConfigs4 = []model.Input{
        {
            Name: "toJobConfig.streamName",
            Value: &valueInputsInterface11,
        },
        {
            Name: "toJobConfig.separator",
            Value: &valueInputsInterface12,
        },
        {
            Name: "toJobConfig.columnList",
            Value: &valueInputsInterface13,
        },
    }
	var listConfigsToConfigValues = []model.Configs{
        {
            Inputs: listInputsConfigs4,
            Name: "toJobConfig",
        },
    }
	toconfigvaluesJobs := &model.ConfigValues{
		Configs: listConfigsToConfigValues,
	}
	var listJobsbody = []model.Job{
        {
            JobType: model.GetJobJobTypeEnum().NORMAL_JOB,
            FromConnectorName: "elasticsearch-connector",
            ToConfigValues: toconfigvaluesJobs,
            ToLinkName: "dis",
            DriverConfigValues: driverconfigvaluesJobs,
            FromConfigValues: fromconfigvaluesJobs,
            ToConnectorName: "dis-connector",
            Name: "es_css",
            FromLinkName: "css",
        },
    }
	request.Body = &model.CdmCreateJobJsonReq{
		Jobs: listJobsbody,
	}
	response, err := client.CreateJob(request)
	if err == nil {
        fmt.Printf("%+v\n", response)
    } else {
        fmt.Println(err)
    }
}

更多编程语言的SDK代码示例,请参见API Explorer的代码示例页签,可生成自动对应的SDK代码示例。

状态码

状态码

描述

200

OK。

400

请求报错。

错误码

请参见错误码

相关文档