更新时间:2024-10-22 GMT+08:00
分享

新增并执行作业

功能介绍

在MRS集群中新增并提交一个作业。

需要先在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“同步”进行IAM用户同步,然后再通过该接口提交作业。

如需使用OBS加密功能,请先参考“MRS用户指南 > 管理现有集群 > 作业管理 > 使用OBS加密数据运行作业”页面进行相关配置后,再调用API接口运行作业。

所有示例中涉及的OBS路径、样例文件及终端节点和AKSK,请提前准备并在提交请求时根据实际情况替换。

接口约束

调用方法

请参见如何调用API

URI

POST /v2/{project_id}/clusters/{cluster_id}/job-executions

表1 路径参数

参数

是否必选

参数类型

描述

project_id

String

参数解释:

项目编号。获取方法,请参见获取项目ID

约束限制:

不涉及

取值范围:

只能由英文字母和数字组成,且长度为[1-64]个字符。

默认取值:

不涉及

cluster_id

String

参数解释:

集群ID。如果指定集群ID,则获取该集群做过补丁更新的最新版本元数据。获取方法,请参见获取集群ID

约束限制:

不涉及

取值范围:

只能由英文字母、数字以及“_”和“-”组成,且长度为[1-64]个字符。

默认取值:

不涉及

请求参数

表2 请求Body参数

参数

是否必选

参数类型

描述

job_type

String

参数解释:

作业类型。

约束限制:

不涉及

取值范围:

  • MapReduce

  • SparkSubmit

  • SparkPython:该类型作业将转换为SparkSubmit类型提交,MRS控制台界面的作业类型展示为SparkSubmit,通过接口查询作业列表信息时作业类型请选择SparkSubmit。

  • HiveScript

  • HiveSql

  • DistCp,导入、导出数据。

  • SparkScript

  • SparkSql

  • Flink

默认取值:

不涉及

job_name

String

参数解释:

作业名称。

约束限制:

不涉及

取值范围:

只能由英文字母、数字以及“_”和“-”组成,且长度为[1-64]个字符。

不同作业的名称允许相同,但不建议设置相同。

默认取值:

不涉及

arguments

Array of strings

参数解释:

程序执行的关键参数,该参数由用户程序内的函数指定,MRS只负责参数的传入。

约束限制:

参数最多为150000字符,不能包含;|&>'<$!"\特殊字符,可为空。

说明:

  • 若输入带有敏感信息(如登录密码)的参数可能在作业详情展示和日志打印中存在暴露的风险,请谨慎操作。

  • 提交HiveScript或HiveSql类型的作业时如需以“obs://”开头格式访问存储在OBS上的文件,请在Hive服务配置页面搜索参数“core.site.customized.configs”,新增OBS的endpoint配置项,参数为“fs.obs.endpoint”,值请输入OBS对应的endpoint,具体请参考终端节点

properties

Map<String,String>

参数解释:

程序系统参数。

约束限制:

参数最多为2048字符,不能包含><|'`&!\特殊字符,可为空。

响应参数

状态码: 202

表3 响应Body参数

参数

参数类型

描述

job_submit_result

JobSubmitResult object

参数解释:

作业执行结果。

表4 JobSubmitResult

参数

参数类型

描述

job_id

String

参数解释:

作业ID。

取值范围:

不涉及

state

String

参数解释:

作业提交状态。

取值范围:

  • COMPLETE:作业提交完成。

  • FAILED:作业提交失败。

状态码: 400

表5 响应Body参数

参数

参数类型

描述

error_code

String

参数解释:

错误码。

取值范围:

不涉及

error_msg

String

参数解释:

错误描述。

取值范围:

不涉及

请求示例

  • 新增一个MapReduce作业

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_name" : "MapReduceTest",
      "job_type" : "MapReduce",
      "arguments" : [ "obs://obs-test/program/hadoop-mapreduce-examples-x.x.x.jar", "wordcount", "obs://obs-test/input/", "obs://obs-test/job/mapreduce/output" ],
      "properties" : {
        "fs.obs.endpoint" : "obs endpoint",
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }
  • 新增一个SparkSubmit作业

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_name" : "SparkSubmitTest",
      "job_type" : "SparkSubmit",
      "arguments" : [ "--master", "yarn", "--deploy-mode", "cluster", "--py-files", "obs://obs-test/a.py", "--conf", "spark.yarn.appMasterEnv.PYTHONPATH=/tmp:$PYTHONPATH", "--conf", "spark.yarn.appMasterEnv.aaa=aaaa", "--conf", "spark.executorEnv.aaa=executoraaa", "--properties-file", "obs://obs-test/test-spark.conf", "obs://obs-test/pi.py", "100000" ],
      "properties" : {
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }
  • 新增一个HiveScript作业

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_name" : "HiveScriptTest",
      "job_type" : "HiveScript",
      "arguments" : [ "obs://obs-test/sql/test_script.sql" ],
      "properties" : {
        "fs.obs.endpoint" : "obs endpoint",
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }
  • 新建一个HiveSql作业

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_name" : "HiveSqlTest",
      "job_type" : "HiveSql",
      "arguments" : [ "DROP TABLE IF EXISTS src_wordcount;\ncreate external table src_wordcount(line string) row format delimited fields terminated by \"\\n\" stored as textfile location \"obs://donotdel-gxc/input/\";\ninsert into src_wordcount values(\"v1\")" ],
      "properties" : {
        "fs.obs.endpoint" : "obs endpoint",
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }
  • 新建一个DistCp作业

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_name" : "DistCpTest",
      "job_type" : "DistCp",
      "arguments" : [ "obs://obs-test/DistcpJob/", "/user/test/sparksql/" ],
      "properties" : {
        "fs.obs.endpoint" : "obs endpoint",
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }
  • 新建一个SparkScript作业

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_type" : "SparkSql",
      "job_name" : "SparkScriptTest",
      "arguments" : [ "op-key1", "op-value1", "op-key2", "op-value2", "obs://obs-test/sql/test_script.sql" ],
      "properties" : {
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }
  • 新建一个SparkSql作业

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_type" : "SparkSql",
      "job_name" : "SparkSqlTest",
      "arguments" : [ "op-key1", "op-value1", "op-key2", "op-value2", "create table student_info3 (id string,name string,gender string,age int,addr string);" ],
      "properties" : {
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }
  • 新建一个Flink作业

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_name" : "flinkTest",
      "job_type" : "Flink",
      "arguments" : [ "run", "-d", "-ynm", "testExcutorejobhdfsbatch", "-m", "yarn-cluster", "hdfs://test/examples/batch/WordCount.jar" ],
      "properties" : {
        "fs.obs.endpoint" : "obs endpoint",
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }
  • 新增一个SparkPython作业(该类型作业将转换为SparkSubmit类型提交,MRS控制台界面的作业类型展示为SparkSubmit,通过接口查询作业列表信息时作业类型请选择SparkSubmit。)

    POST https://{endpoint}/v2/{project_id}/clusters/{cluster_id}/job-executions
    
    {
      "job_type" : "SparkPython",
      "job_name" : "SparkPythonTest",
      "arguments" : [ "--master", "yarn", "--deploy-mode", "cluster", "--py-files", "obs://obs-test/a.py", "--conf", "spark.yarn.appMasterEnv.PYTHONPATH=/tmp:$PYTHONPATH", "--conf", "spark.yarn.appMasterEnv.aaa=aaaa", "--conf", "spark.executorEnv.aaa=executoraaa", "--properties-file", "obs://obs-test/test-spark.conf", "obs://obs-test/pi.py", 100000 ],
      "properties" : {
        "fs.obs.access.key" : "xxx",
        "fs.obs.secret.key" : "yyy"
      }
    }

响应示例

状态码: 202

新增并执行作业

{
  "job_submit_result" : {
    "job_id" : "44b37a20-ffe8-42b1-b42b-78a5978d7e40",
    "state" : "COMPLETE"
  }
}

状态码: 400

新增并执行作业失败

{
  "job_submit_result" : {
    "error_msg" : "不能提交Hive相关作业",
    "error_code" : "0168"
  }
}

SDK代码示例

SDK代码示例如下。

  • 新增一个MapReduce作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.endpoint", "obs endpoint");
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("obs://obs-test/program/hadoop-mapreduce-examples-x.x.x.jar");
            listbodyArguments.add("wordcount");
            listbodyArguments.add("obs://obs-test/input/");
            listbodyArguments.add("obs://obs-test/job/mapreduce/output");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("MapReduceTest");
            body.withJobType("MapReduce");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新增一个SparkSubmit作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("--master");
            listbodyArguments.add("yarn");
            listbodyArguments.add("--deploy-mode");
            listbodyArguments.add("cluster");
            listbodyArguments.add("--py-files");
            listbodyArguments.add("obs://obs-test/a.py");
            listbodyArguments.add("--conf");
            listbodyArguments.add("spark.yarn.appMasterEnv.PYTHONPATH=/tmp:$PYTHONPATH");
            listbodyArguments.add("--conf");
            listbodyArguments.add("spark.yarn.appMasterEnv.aaa=aaaa");
            listbodyArguments.add("--conf");
            listbodyArguments.add("spark.executorEnv.aaa=executoraaa");
            listbodyArguments.add("--properties-file");
            listbodyArguments.add("obs://obs-test/test-spark.conf");
            listbodyArguments.add("obs://obs-test/pi.py");
            listbodyArguments.add("100000");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("SparkSubmitTest");
            body.withJobType("SparkSubmit");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新增一个HiveScript作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.endpoint", "obs endpoint");
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("obs://obs-test/sql/test_script.sql");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("HiveScriptTest");
            body.withJobType("HiveScript");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新建一个HiveSql作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.endpoint", "obs endpoint");
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("DROP TABLE IF EXISTS src_wordcount;
    create external table src_wordcount(line string) row format delimited fields terminated by "\n" stored as textfile location "obs://donotdel-gxc/input/";
    insert into src_wordcount values("v1")");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("HiveSqlTest");
            body.withJobType("HiveSql");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新建一个DistCp作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.endpoint", "obs endpoint");
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("obs://obs-test/DistcpJob/");
            listbodyArguments.add("/user/test/sparksql/");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("DistCpTest");
            body.withJobType("DistCp");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新建一个SparkScript作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("op-key1");
            listbodyArguments.add("op-value1");
            listbodyArguments.add("op-key2");
            listbodyArguments.add("op-value2");
            listbodyArguments.add("obs://obs-test/sql/test_script.sql");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("SparkScriptTest");
            body.withJobType("SparkSql");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新建一个SparkSql作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("op-key1");
            listbodyArguments.add("op-value1");
            listbodyArguments.add("op-key2");
            listbodyArguments.add("op-value2");
            listbodyArguments.add("create table student_info3 (id string,name string,gender string,age int,addr string);");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("SparkSqlTest");
            body.withJobType("SparkSql");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新建一个Flink作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.endpoint", "obs endpoint");
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("run");
            listbodyArguments.add("-d");
            listbodyArguments.add("-ynm");
            listbodyArguments.add("testExcutorejobhdfsbatch");
            listbodyArguments.add("-m");
            listbodyArguments.add("yarn-cluster");
            listbodyArguments.add("hdfs://test/examples/batch/WordCount.jar");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("flinkTest");
            body.withJobType("Flink");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新增一个SparkPython作业(该类型作业将转换为SparkSubmit类型提交,MRS控制台界面的作业类型展示为SparkSubmit,通过接口查询作业列表信息时作业类型请选择SparkSubmit。)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.mrs.v2.region.MrsRegion;
    import com.huaweicloud.sdk.mrs.v2.*;
    import com.huaweicloud.sdk.mrs.v2.model.*;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    
    public class CreateExecuteJobSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            MrsClient client = MrsClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(MrsRegion.valueOf("<YOUR REGION>"))
                    .build();
            CreateExecuteJobRequest request = new CreateExecuteJobRequest();
            request.withClusterId("{cluster_id}");
            JobExecution body = new JobExecution();
            Map<String, String> listbodyProperties = new HashMap<>();
            listbodyProperties.put("fs.obs.access.key", "xxx");
            listbodyProperties.put("fs.obs.secret.key", "yyy");
            List<String> listbodyArguments = new ArrayList<>();
            listbodyArguments.add("--master");
            listbodyArguments.add("yarn");
            listbodyArguments.add("--deploy-mode");
            listbodyArguments.add("cluster");
            listbodyArguments.add("--py-files");
            listbodyArguments.add("obs://obs-test/a.py");
            listbodyArguments.add("--conf");
            listbodyArguments.add("spark.yarn.appMasterEnv.PYTHONPATH=/tmp:$PYTHONPATH");
            listbodyArguments.add("--conf");
            listbodyArguments.add("spark.yarn.appMasterEnv.aaa=aaaa");
            listbodyArguments.add("--conf");
            listbodyArguments.add("spark.executorEnv.aaa=executoraaa");
            listbodyArguments.add("--properties-file");
            listbodyArguments.add("obs://obs-test/test-spark.conf");
            listbodyArguments.add("obs://obs-test/pi.py");
            listbodyArguments.add("100000");
            body.withProperties(listbodyProperties);
            body.withArguments(listbodyArguments);
            body.withJobName("SparkPythonTest");
            body.withJobType("SparkPython");
            request.withBody(body);
            try {
                CreateExecuteJobResponse response = client.createExecuteJob(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • 新增一个MapReduce作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.endpoint": "obs endpoint",
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "obs://obs-test/program/hadoop-mapreduce-examples-x.x.x.jar",
                "wordcount",
                "obs://obs-test/input/",
                "obs://obs-test/job/mapreduce/output"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="MapReduceTest",
                job_type="MapReduce"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新增一个SparkSubmit作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "--master",
                "yarn",
                "--deploy-mode",
                "cluster",
                "--py-files",
                "obs://obs-test/a.py",
                "--conf",
                "spark.yarn.appMasterEnv.PYTHONPATH=/tmp:$PYTHONPATH",
                "--conf",
                "spark.yarn.appMasterEnv.aaa=aaaa",
                "--conf",
                "spark.executorEnv.aaa=executoraaa",
                "--properties-file",
                "obs://obs-test/test-spark.conf",
                "obs://obs-test/pi.py",
                "100000"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="SparkSubmitTest",
                job_type="SparkSubmit"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新增一个HiveScript作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.endpoint": "obs endpoint",
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "obs://obs-test/sql/test_script.sql"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="HiveScriptTest",
                job_type="HiveScript"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新建一个HiveSql作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.endpoint": "obs endpoint",
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "DROP TABLE IF EXISTS src_wordcount;
            create external table src_wordcount(line string) row format delimited fields terminated by "\n" stored as textfile location "obs://donotdel-gxc/input/";
            insert into src_wordcount values("v1")"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="HiveSqlTest",
                job_type="HiveSql"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新建一个DistCp作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.endpoint": "obs endpoint",
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "obs://obs-test/DistcpJob/",
                "/user/test/sparksql/"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="DistCpTest",
                job_type="DistCp"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新建一个SparkScript作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "op-key1",
                "op-value1",
                "op-key2",
                "op-value2",
                "obs://obs-test/sql/test_script.sql"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="SparkScriptTest",
                job_type="SparkSql"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新建一个SparkSql作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "op-key1",
                "op-value1",
                "op-key2",
                "op-value2",
                "create table student_info3 (id string,name string,gender string,age int,addr string);"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="SparkSqlTest",
                job_type="SparkSql"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新建一个Flink作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.endpoint": "obs endpoint",
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "run",
                "-d",
                "-ynm",
                "testExcutorejobhdfsbatch",
                "-m",
                "yarn-cluster",
                "hdfs://test/examples/batch/WordCount.jar"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="flinkTest",
                job_type="Flink"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新增一个SparkPython作业(该类型作业将转换为SparkSubmit类型提交,MRS控制台界面的作业类型展示为SparkSubmit,通过接口查询作业列表信息时作业类型请选择SparkSubmit。)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    # coding: utf-8
    
    import os
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkmrs.v2.region.mrs_region import MrsRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkmrs.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.environ["CLOUD_SDK_AK"]
        sk = os.environ["CLOUD_SDK_SK"]
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId)
    
        client = MrsClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(MrsRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = CreateExecuteJobRequest()
            request.cluster_id = "{cluster_id}"
            listPropertiesbody = {
                "fs.obs.access.key": "xxx",
                "fs.obs.secret.key": "yyy"
            }
            listArgumentsbody = [
                "--master",
                "yarn",
                "--deploy-mode",
                "cluster",
                "--py-files",
                "obs://obs-test/a.py",
                "--conf",
                "spark.yarn.appMasterEnv.PYTHONPATH=/tmp:$PYTHONPATH",
                "--conf",
                "spark.yarn.appMasterEnv.aaa=aaaa",
                "--conf",
                "spark.executorEnv.aaa=executoraaa",
                "--properties-file",
                "obs://obs-test/test-spark.conf",
                "obs://obs-test/pi.py",
                "100000"
            ]
            request.body = JobExecution(
                properties=listPropertiesbody,
                arguments=listArgumentsbody,
                job_name="SparkPythonTest",
                job_type="SparkPython"
            )
            response = client.create_execute_job(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • 新增一个MapReduce作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.endpoint": "obs endpoint",
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "obs://obs-test/program/hadoop-mapreduce-examples-x.x.x.jar",
    	    "wordcount",
    	    "obs://obs-test/input/",
    	    "obs://obs-test/job/mapreduce/output",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "MapReduceTest",
    		JobType: "MapReduce",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • 新增一个SparkSubmit作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "--master",
    	    "yarn",
    	    "--deploy-mode",
    	    "cluster",
    	    "--py-files",
    	    "obs://obs-test/a.py",
    	    "--conf",
    	    "spark.yarn.appMasterEnv.PYTHONPATH=/tmp:$PYTHONPATH",
    	    "--conf",
    	    "spark.yarn.appMasterEnv.aaa=aaaa",
    	    "--conf",
    	    "spark.executorEnv.aaa=executoraaa",
    	    "--properties-file",
    	    "obs://obs-test/test-spark.conf",
    	    "obs://obs-test/pi.py",
    	    "100000",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "SparkSubmitTest",
    		JobType: "SparkSubmit",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • 新增一个HiveScript作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.endpoint": "obs endpoint",
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "obs://obs-test/sql/test_script.sql",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "HiveScriptTest",
    		JobType: "HiveScript",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • 新建一个HiveSql作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.endpoint": "obs endpoint",
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "DROP TABLE IF EXISTS src_wordcount;
        create external table src_wordcount(line string) row format delimited fields terminated by "\n" stored as textfile location "obs://donotdel-gxc/input/";
        insert into src_wordcount values("v1")",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "HiveSqlTest",
    		JobType: "HiveSql",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • 新建一个DistCp作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.endpoint": "obs endpoint",
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "obs://obs-test/DistcpJob/",
    	    "/user/test/sparksql/",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "DistCpTest",
    		JobType: "DistCp",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • 新建一个SparkScript作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "op-key1",
    	    "op-value1",
    	    "op-key2",
    	    "op-value2",
    	    "obs://obs-test/sql/test_script.sql",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "SparkScriptTest",
    		JobType: "SparkSql",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • 新建一个SparkSql作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "op-key1",
    	    "op-value1",
    	    "op-key2",
    	    "op-value2",
    	    "create table student_info3 (id string,name string,gender string,age int,addr string);",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "SparkSqlTest",
    		JobType: "SparkSql",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • 新建一个Flink作业

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.endpoint": "obs endpoint",
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "run",
    	    "-d",
    	    "-ynm",
    	    "testExcutorejobhdfsbatch",
    	    "-m",
    	    "yarn-cluster",
    	    "hdfs://test/examples/batch/WordCount.jar",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "flinkTest",
    		JobType: "Flink",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • 新增一个SparkPython作业(该类型作业将转换为SparkSubmit类型提交,MRS控制台界面的作业类型展示为SparkSubmit,通过接口查询作业列表信息时作业类型请选择SparkSubmit。)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        mrs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/mrs/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := mrs.NewMrsClient(
            mrs.MrsClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.CreateExecuteJobRequest{}
    	request.ClusterId = "{cluster_id}"
    	var listPropertiesbody = map[string]string{
            "fs.obs.access.key": "xxx",
            "fs.obs.secret.key": "yyy",
        }
    	var listArgumentsbody = []string{
            "--master",
    	    "yarn",
    	    "--deploy-mode",
    	    "cluster",
    	    "--py-files",
    	    "obs://obs-test/a.py",
    	    "--conf",
    	    "spark.yarn.appMasterEnv.PYTHONPATH=/tmp:$PYTHONPATH",
    	    "--conf",
    	    "spark.yarn.appMasterEnv.aaa=aaaa",
    	    "--conf",
    	    "spark.executorEnv.aaa=executoraaa",
    	    "--properties-file",
    	    "obs://obs-test/test-spark.conf",
    	    "obs://obs-test/pi.py",
    	    "100000",
        }
    	request.Body = &model.JobExecution{
    		Properties: listPropertiesbody,
    		Arguments: &listArgumentsbody,
    		JobName: "SparkPythonTest",
    		JobType: "SparkPython",
    	}
    	response, err := client.CreateExecuteJob(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    

更多编程语言的SDK代码示例,请参见API Explorer的代码示例页签,可生成自动对应的SDK代码示例。

状态码

状态码

描述

202

新增并执行作业

400

新增并执行作业失败

错误码

请参见错误码

相关文档