指定集群创建作业
功能介绍
指定集群创建作业接口。
调用方法
请参见如何调用API。
URI
POST /v1.1/{project_id}/clusters/{cluster_id}/cdm/job
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
project_id |
是 |
String |
项目ID,获取方法请参见项目ID和账号ID |
cluster_id |
是 |
String |
集群ID |
请求参数
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
X-Auth-Token |
是 |
String |
用户Token。 通过调用IAM服务获取用户Token接口获取(响应消息头中X-Subject-Token的值)。 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
jobs |
是 |
Array of Job objects |
作业列表,请参见jobs数据结构说明。 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
job_type |
是 |
String |
作业类型:
枚举值:
|
from-connector-name |
是 |
String |
源端连接类型,对应的连接参数如下:
|
to-config-values |
是 |
ConfigValues object |
目的连接参数配置。根据不同目的端有不同的参数配置,具体可参考目的端作业参数说明下相应的目的端参数配置。 |
to-link-name |
是 |
String |
目的端连接名称,即为通过“创建连接”接口创建的连接对应的连接名。 |
driver-config-values |
是 |
ConfigValues object |
作业任务参数配置。例如配置作业失败重试、抽取并发数,具体可参考作业任务参数说明。 |
from-config-values |
是 |
ConfigValues object |
源连接参数配置。根据不同源端有不同的参数配置,具体可参考源端作业参数说明下相应的源端参数配置。 |
to-connector-name |
是 |
String |
目的端连接类型,对应的连接参数如下:
|
name |
是 |
String |
作业名称,长度在1到240个字符之间 最小长度:1 最大长度:240 |
from-link-name |
是 |
String |
源连接名称,即为通过“创建连接”接口创建的连接对应的连接名。 |
creation-user |
否 |
String |
创建作业的用户。由系统生成,用户无需填写。 |
creation-date |
否 |
Long |
作业创建的时间,单位:毫秒。由系统生成,用户无需填写。 |
update-date |
否 |
Long |
作业最后更新的时间,单位:毫秒。由系统生成,用户无需填写。 |
is_incre_job |
否 |
Boolean |
是否是增量作业。已废弃 |
flag |
否 |
Integer |
是否是定时作业标记,如果是定时作业则为1,否则为0。由系统根据定时任务配置生成,用户无需填写。 |
files_read |
否 |
Integer |
已读文件数。由系统生成,用户无需填写。 |
update-user |
否 |
String |
最后更新作业的用户。由系统生成,用户无需填写。 |
external_id |
否 |
String |
具体执行的作业id,如果是本地作业,则一般为"job_local1202051771_0002"形式,如果是DLI作业,则为DLI作业ID,比如"12345"。由系统生成,用户无需填写。 |
type |
否 |
String |
与job_type一致,作业类型:
|
execute_start_date |
否 |
Long |
最近一次执行任务开始时间,单位:毫秒。由系统生成,用户无需填写。 |
delete_rows |
否 |
Integer |
增量作业删除行数,已废弃。 |
enabled |
否 |
Boolean |
是否激活连接。由系统生成,用户无需填写。 |
bytes_written |
否 |
Long |
作业写入的字节。由系统生成,用户无需填写。 |
id |
否 |
Integer |
作业ID。由系统生成,用户无需填写。 |
is_use_sql |
否 |
Boolean |
用户是否使用sql。由系统根据源端抽取是否使用sql语句生成,用户无需填写 |
update_rows |
否 |
Integer |
增量作业更新行数,已废弃。 |
group_name |
否 |
String |
组名 |
bytes_read |
否 |
Long |
作业读取的字节。由系统生成,用户无需填写。 |
execute_update_date |
否 |
Long |
最近一次执行任务更新时间,单位:毫秒。由系统生成,用户无需填写。 |
write_rows |
否 |
Integer |
增量作业写入行数,已废弃。 |
rows_written |
否 |
Integer |
作业写入的行数。由系统生成,用户无需填写。 |
rows_read |
否 |
Long |
作业读取的行数。由系统生成,用户无需填写。 |
files_written |
否 |
Integer |
写入文件数。由系统生成,用户无需填写。 |
is_incrementing |
否 |
Boolean |
是否是增量作业,同is_incre_job,已废弃。 |
execute_create_date |
否 |
Long |
最近一次执行任务创建时间,单位:毫秒。由系统生成,用户无需填写。 |
status |
否 |
String |
作业最后的执行状态:
|
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
configs |
是 |
Array of configs objects |
源连接参数、目的连接参数和作业任务参数,它们的配置数据结构相同,其中“inputs”里的参数不一样,详细请参见configs数据结构说明。 |
extended-configs |
否 |
extended-configs object |
扩展配置,请参见extended-configs参数说明。扩展配置暂不对外开放,用户无需填写。 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
inputs |
是 |
Array of Input objects |
输入参数列表,列表中的每个参数为“name,value”结构,请参考inputs数据结构参数说明。在“from-config-values”数据结构中,不同的源连接类型有不同的“inputs”参数列表,请参见源端作业参数说明下的章节。在“to-config-values”数据结构中,不同的目的连接类型有不同的“inputs”参数列表,请参见目的端作业参数说明下面的子章节。在“driver-config-values”数据结构中,“inputs”具体参数请参见作业任务参数说明。 |
name |
是 |
String |
配置名称:源端作业的配置名称为“fromJobConfig”。目的端作业的配置名称为“toJobConfig”,连接的配置名称固定为“linkConfig”。 |
id |
否 |
Integer |
配置ID,由系统生成,用户无需填写。 |
type |
否 |
String |
配置类型,由系统生成,用户无需填写。值为LINK或者JOB,如果是连接管理API,则为LINK;如果是作业管理API,则为JOB。 |
响应参数
状态码: 200
参数 |
参数类型 |
描述 |
---|---|---|
name |
String |
作业名称。 |
validation-result |
Array of JobValidationResult objects |
校验结果:
|
参数 |
参数类型 |
描述 |
---|---|---|
message |
String |
错误描述 |
status |
String |
ERROR,WARNING 枚举值:
|
状态码: 400
参数 |
参数类型 |
描述 |
---|---|---|
code |
String |
返回编码 |
errCode |
String |
错误码 |
message |
String |
报错信息 |
externalMessage |
String |
附加信息 |
请求示例
创建一个源端为Elasticsearch数据连接,目的端为DIS数据连接,作业名为es_css的数据迁移作业。
POST /v1.1/1551c7f6c808414d8e9f3c514a170f2e/clusters/6ec9a0a4-76be-4262-8697-e7af1fac7920/cdm/job { "jobs" : [ { "job_type" : "NORMAL_JOB", "from-connector-name" : "elasticsearch-connector", "to-config-values" : { "configs" : [ { "inputs" : [ { "name" : "toJobConfig.streamName", "value" : "dis-lkGm" }, { "name" : "toJobConfig.separator", "value" : "|" }, { "name" : "toJobConfig.columnList", "value" : "1&2&3" } ], "name" : "toJobConfig" } ] }, "to-link-name" : "dis", "driver-config-values" : { "configs" : [ { "inputs" : [ { "name" : "throttlingConfig.numExtractors", "value" : "1" }, { "name" : "throttlingConfig.submitToCluster", "value" : "false" }, { "name" : "throttlingConfig.numLoaders", "value" : "1" }, { "name" : "throttlingConfig.recordDirtyData", "value" : "false" } ], "name" : "throttlingConfig" }, { "inputs" : [ ], "name" : "jarConfig" }, { "inputs" : [ { "name" : "schedulerConfig.isSchedulerJob", "value" : "false" }, { "name" : "schedulerConfig.disposableType", "value" : "NONE" } ], "name" : "schedulerConfig" }, { "inputs" : [ ], "name" : "transformConfig" }, { "inputs" : [ { "name" : "retryJobConfig.retryJobType", "value" : "NONE" } ], "name" : "retryJobConfig" } ] }, "from-config-values" : { "configs" : [ { "inputs" : [ { "name" : "fromJobConfig.index", "value" : "52est" }, { "name" : "fromJobConfig.type", "value" : "est_array" }, { "name" : "fromJobConfig.columnList", "value" : "array_f1_int:long&array_f2_text:string&array_f3_object:nested" }, { "name" : "fromJobConfig.splitNestedField", "value" : "false" } ], "name" : "fromJobConfig" } ] }, "to-connector-name" : "dis-connector", "name" : "es_css", "from-link-name" : "css" } ] }
响应示例
状态码: 200
ok
{ "name" : "mysql2hive" }
状态码: 400
请求报错
{ "code" : "Cdm.0104", "errCode" : "Cdm.0104", "message" : "Job name already exist or created by other.", "ternalMessage" : "Job name already exist or created by other." }
SDK代码示例
SDK代码示例如下。
创建一个源端为Elasticsearch数据连接,目的端为DIS数据连接,作业名为es_css的数据迁移作业。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.cdm.v1.region.cdmRegion; import com.huaweicloud.sdk.cdm.v1.*; import com.huaweicloud.sdk.cdm.v1.model.*; import java.util.List; import java.util.ArrayList; public class CreateJobSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); cdmClient client = cdmClient.newBuilder() .withCredential(auth) .withRegion(cdmRegion.valueOf("<YOUR REGION>")) .build(); CreateJobRequest request = new CreateJobRequest(); request.withClusterId("{cluster_id}"); CdmCreateJobJsonReq body = new CdmCreateJobJsonReq(); List<Input> listConfigsInputs = new ArrayList<>(); listConfigsInputs.add( new Input() .withName("fromJobConfig.index") .withValue("52est") ); listConfigsInputs.add( new Input() .withName("fromJobConfig.type") .withValue("est_array") ); listConfigsInputs.add( new Input() .withName("fromJobConfig.columnList") .withValue("array_f1_int:long&array_f2_text:string&array_f3_object:nested") ); listConfigsInputs.add( new Input() .withName("fromJobConfig.splitNestedField") .withValue("false") ); List<Configs> listFromConfigValuesConfigs = new ArrayList<>(); listFromConfigValuesConfigs.add( new Configs() .withInputs(listConfigsInputs) .withName("fromJobConfig") ); ConfigValues fromconfigvaluesJobs = new ConfigValues(); fromconfigvaluesJobs.withConfigs(listFromConfigValuesConfigs); List<Input> listConfigsInputs1 = new ArrayList<>(); listConfigsInputs1.add( new Input() .withName("retryJobConfig.retryJobType") .withValue("NONE") ); List<Input> listConfigsInputs2 = new ArrayList<>(); listConfigsInputs2.add( new Input() .withName("schedulerConfig.isSchedulerJob") .withValue("false") ); listConfigsInputs2.add( new Input() .withName("schedulerConfig.disposableType") .withValue("NONE") ); List<Input> listConfigsInputs3 = new ArrayList<>(); listConfigsInputs3.add( new Input() .withName("throttlingConfig.numExtractors") .withValue("1") ); listConfigsInputs3.add( new Input() .withName("throttlingConfig.submitToCluster") .withValue("false") ); listConfigsInputs3.add( new Input() .withName("throttlingConfig.numLoaders") .withValue("1") ); listConfigsInputs3.add( new Input() .withName("throttlingConfig.recordDirtyData") .withValue("false") ); List<Configs> listDriverConfigValuesConfigs = new ArrayList<>(); listDriverConfigValuesConfigs.add( new Configs() .withInputs(listConfigsInputs1) .withName("retryJobConfig") ); ConfigValues driverconfigvaluesJobs = new ConfigValues(); driverconfigvaluesJobs.withConfigs(listDriverConfigValuesConfigs); List<Input> listConfigsInputs4 = new ArrayList<>(); listConfigsInputs4.add( new Input() .withName("toJobConfig.streamName") .withValue("dis-lkGm") ); listConfigsInputs4.add( new Input() .withName("toJobConfig.separator") .withValue("|") ); listConfigsInputs4.add( new Input() .withName("toJobConfig.columnList") .withValue("1&2&3") ); List<Configs> listToConfigValuesConfigs = new ArrayList<>(); listToConfigValuesConfigs.add( new Configs() .withInputs(listConfigsInputs4) .withName("toJobConfig") ); ConfigValues toconfigvaluesJobs = new ConfigValues(); toconfigvaluesJobs.withConfigs(listToConfigValuesConfigs); List<Job> listbodyJobs = new ArrayList<>(); listbodyJobs.add( new Job() .withJobType(Job.JobTypeEnum.fromValue("NORMAL_JOB")) .withFromConnectorName("elasticsearch-connector") .withToConfigValues(toconfigvaluesJobs) .withToLinkName("dis") .withDriverConfigValues(driverconfigvaluesJobs) .withFromConfigValues(fromconfigvaluesJobs) .withToConnectorName("dis-connector") .withName("es_css") .withFromLinkName("css") ); body.withJobs(listbodyJobs); request.withBody(body); try { CreateJobResponse response = client.createJob(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } } |
创建一个源端为Elasticsearch数据连接,目的端为DIS数据连接,作业名为es_css的数据迁移作业。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkcdm.v1.region.cdm_region import cdmRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkcdm.v1 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = __import__('os').getenv("CLOUD_SDK_AK") sk = __import__('os').getenv("CLOUD_SDK_SK") projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) \ client = cdmClient.new_builder() \ .with_credentials(credentials) \ .with_region(cdmRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateJobRequest() request.cluster_id = "{cluster_id}" listInputsConfigs = [ Input( name="fromJobConfig.index", value="52est" ), Input( name="fromJobConfig.type", value="est_array" ), Input( name="fromJobConfig.columnList", value="array_f1_int:long&array_f2_text:string&array_f3_object:nested" ), Input( name="fromJobConfig.splitNestedField", value="false" ) ] listConfigsFromconfigvalues = [ Configs( inputs=listInputsConfigs, name="fromJobConfig" ) ] fromconfigvaluesJobs = ConfigValues( configs=listConfigsFromconfigvalues ) listInputsConfigs1 = [ Input( name="retryJobConfig.retryJobType", value="NONE" ) ] listInputsConfigs2 = [ Input( name="schedulerConfig.isSchedulerJob", value="false" ), Input( name="schedulerConfig.disposableType", value="NONE" ) ] listInputsConfigs3 = [ Input( name="throttlingConfig.numExtractors", value="1" ), Input( name="throttlingConfig.submitToCluster", value="false" ), Input( name="throttlingConfig.numLoaders", value="1" ), Input( name="throttlingConfig.recordDirtyData", value="false" ) ] listConfigsDriverconfigvalues = [ Configs( inputs=listInputsConfigs1, name="retryJobConfig" ) ] driverconfigvaluesJobs = ConfigValues( configs=listConfigsDriverconfigvalues ) listInputsConfigs4 = [ Input( name="toJobConfig.streamName", value="dis-lkGm" ), Input( name="toJobConfig.separator", value="|" ), Input( name="toJobConfig.columnList", value="1&2&3" ) ] listConfigsToconfigvalues = [ Configs( inputs=listInputsConfigs4, name="toJobConfig" ) ] toconfigvaluesJobs = ConfigValues( configs=listConfigsToconfigvalues ) listJobsbody = [ Job( job_type="NORMAL_JOB", from_connector_name="elasticsearch-connector", to_config_values=toconfigvaluesJobs, to_link_name="dis", driver_config_values=driverconfigvaluesJobs, from_config_values=fromconfigvaluesJobs, to_connector_name="dis-connector", name="es_css", from_link_name="css" ) ] request.body = CdmCreateJobJsonReq( jobs=listJobsbody ) response = client.create_job(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) |
创建一个源端为Elasticsearch数据连接,目的端为DIS数据连接,作业名为es_css的数据迁移作业。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" cdm "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/cdm/v1" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/cdm/v1/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/cdm/v1/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := cdm.NewcdmClient( cdm.cdmClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateJobRequest{} request.ClusterId = "{cluster_id}" var listInputsConfigs = []model.Input{ { Name: "fromJobConfig.index", Value: "52est", }, { Name: "fromJobConfig.type", Value: "est_array", }, { Name: "fromJobConfig.columnList", Value: "array_f1_int:long&array_f2_text:string&array_f3_object:nested", }, { Name: "fromJobConfig.splitNestedField", Value: "false", }, } var listConfigsFromConfigValues = []model.Configs{ { Inputs: listInputsConfigs, Name: "fromJobConfig", }, } fromconfigvaluesJobs := &model.ConfigValues{ Configs: listConfigsFromConfigValues, } var listInputsConfigs1 = []model.Input{ { Name: "retryJobConfig.retryJobType", Value: "NONE", }, } var listInputsConfigs2 = []model.Input{ { Name: "schedulerConfig.isSchedulerJob", Value: "false", }, { Name: "schedulerConfig.disposableType", Value: "NONE", }, } var listInputsConfigs3 = []model.Input{ { Name: "throttlingConfig.numExtractors", Value: "1", }, { Name: "throttlingConfig.submitToCluster", Value: "false", }, { Name: "throttlingConfig.numLoaders", Value: "1", }, { Name: "throttlingConfig.recordDirtyData", Value: "false", }, } var listConfigsDriverConfigValues = []model.Configs{ { Inputs: listInputsConfigs1, Name: "retryJobConfig", }, } driverconfigvaluesJobs := &model.ConfigValues{ Configs: listConfigsDriverConfigValues, } var listInputsConfigs4 = []model.Input{ { Name: "toJobConfig.streamName", Value: "dis-lkGm", }, { Name: "toJobConfig.separator", Value: "|", }, { Name: "toJobConfig.columnList", Value: "1&2&3", }, } var listConfigsToConfigValues = []model.Configs{ { Inputs: listInputsConfigs4, Name: "toJobConfig", }, } toconfigvaluesJobs := &model.ConfigValues{ Configs: listConfigsToConfigValues, } var listJobsbody = []model.Job{ { JobType: model.GetJobJobTypeEnum().NORMAL_JOB, FromConnectorName: "elasticsearch-connector", ToConfigValues: toconfigvaluesJobs, ToLinkName: "dis", DriverConfigValues: driverconfigvaluesJobs, FromConfigValues: fromconfigvaluesJobs, ToConnectorName: "dis-connector", Name: "es_css", FromLinkName: "css", }, } request.Body = &model.CdmCreateJobJsonReq{ Jobs: listJobsbody, } response, err := client.CreateJob(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } } |
更多编程语言的SDK代码示例,请参见API Explorer的代码示例页签,可生成自动对应的SDK代码示例。
状态码
状态码 |
描述 |
---|---|
200 |
ok |
400 |
请求报错 |
错误码
请参见错误码。