更新时间:2025-05-07 GMT+08:00

开发Launcher任务

本章节详细介绍使用Notebook开发Launcher任务的操作指导。

前提条件

已启用Notebook并且已创建出Notebook实例。创建Notebook实例的操作请参见创建Notebook实例

约束限制

  • 目前仅支持DLI、MRS两种数据源。
  • MRS Spark连接仅支持管理中心创建的MRS Spark代理连接方式。

Launcher开发入口

  1. 在数据开发主界面的左侧导航栏,选择“数据开发 > Notebook”。
  2. 选择状态为“运行中”的Notebook实例。
  3. 单击“打开”按钮进入Notebook的开发界面。
  4. 单击,选择Launcher,进入Launcher开发界面。目前仅支持DLI Spark x.x.x(PySpark)DLI Spark x.x.x(Scale)MRS x.x.x(PySpark)python-x.x.x四种语言任务开发。

    x.x.x表示版本号。

    图1 选择Launcher

开发DLI Spark(PySpark)任务

开发DLI任务前,请先为DLI创建一个委托并进行授权,例如:委托名称notebook_agency,授权DLI FullAccess和OBS Administrator。操作方法请参考创建DLI自定义委托
图2 创建委托并授权

同时,在“权限管理 > 权限”里面配置如下自定义策略:

图3 自定义策略
{
    "Version": "1.1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:agencies:pass"
            ]
        }
    ]
}
  1. 单击DLI Spark x.x.x(PySpark),进入DLI Spark(PySpark)开发界面。
  2. 在界面右上角,单击“connect”配置连接信息。
    • 配置连接基本参数Pool和Queue(DLI的资源池和队列名称)。
    • 配置高级参数(Advanced Settings)。
      表1 Spark Config

      参数

      描述

      --conf

      填写DLI任务运行的参数。例如配置DLI任务的代理名称。

      spark.dli.job.agency.name=notebook_agency(必填)

      其他参数根据实际业务需要进行配置。

      --jars

      填写文件的OBS路径,多个路径以Enter键分隔。(可选)

      --py-files

      填写文件的OBS路径,多个路径以Enter键分隔。(可选)

      --files

      填写文件的OBS路径,多个路径以Enter键分隔。(可选)

      表2 Resource Config

      参数

      描述

      Driver Memory

      设置Driver Memory的大小。大小在0-16之间。默认值为1,不能输入0。

      Driver Cores

      设置Driver Cores的大小。大小在0-4之间。默认值为1,不能输入0。

      Executor Memory

      设置Executor Memory的大小。大小在0-16之间。默认值为1,不能输入0。

      Executor Cores

      设置Executor Cores的大小。大小在0-4之间。默认值为1,不能输入0。

      Executors

      设置Executors的大小。大小在0-16之间。默认值为1,不能输入0。

    • 单击“Connect”。配置好以后,会展示DLI队列信息和集群状态“cluster status: connected”。
  3. 单击代码行,输入开发代码并进行调试。
  4. 单击代码行前面的,可以运行该行代码。
    表3 代码示例

    示例代码

    运行结果示意图

    %%spark
    spark.read.parquet('obs://mytestbucket/demo/data.parquet').show()
    图4 示例
    %%sql
    show tables
    图5 示例
    %scala
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().appName("demo").getOrCreate();
    val inputFile = "obs://mytestbucket/demo/test.txt"
    val outputDir = "obs://mytestbucket/demo/test"
    val textFile = spark.read.textFile(inputFile)
    val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
    wordCounts.write.format("csv").save(outputDir)
    wordCounts.show()
    spark.stop()
    图6 示例
    %python
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("PySpark Example").getOrCreate()
    data = [("Alice", 34), ("Bob", 45), ("Charlie", 29), ("David", 35)]
    columns = ["Name", "Age"]
    df = spark.createDataFrame(data, columns)
    df.show()
    filtered_df = df.filter(df.Age > 30)
    filtered_df.show()
    average_age = df.groupBy().avg("Age").collect()[0][0]
    print(f"Average Age: {average_age}")
    spark.stop()
    图7 示例
  5. 代码开发完毕后,进行保存并运行。
  6. 查看代码运行结果。

开发DLI Spark(Scala)任务

开发DLI任务前,请先为DLI创建一个委托并进行授权,例如:委托名称notebook_agency,授权DLI FullAccess和OBS Administrator。操作方法请参考创建DLI自定义委托
图8 创建委托并授权

同时,在“权限管理 > 权限”里面配置如下自定义策略:

图9 自定义策略
{
    "Version": "1.1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:agencies:pass"
            ]
        }
    ]
}
  1. 单击DLI Spark x.x.x(Scale),进入DLI Spark(Scale)开发界面。
  2. 在界面右上角,单击“connect”配置连接信息。
    • 配置连接基本参数Pool和Queue(DLI的资源池和队列名称)。
    • 配置高级参数(Advanced Settings)。
      表4 Spark Config

      参数

      描述

      --conf

      填写DLI任务运行的参数。例如配置DLI任务的代理名称。

      spark.dli.job.agency.name=notebook_agency(必填)

      其他参数根据实际业务需要进行配置。

      --jars

      填写文件的OBS路径,多个路径以Enter键分隔。(可选)

      --py-files

      填写文件的OBS路径,多个路径以Enter键分隔。(可选)

      --files

      填写文件的OBS路径,多个路径以Enter键分隔。(可选)

      表5 Resource Config

      参数

      描述

      Driver Memory

      设置Driver Memory的大小。大小在0-16之间。默认值为1,不能输入0。

      Driver Cores

      设置Driver Cores的大小。大小在0-4之间。默认值为1,不能输入0。

      Executor Memory

      设置Executor Memory的大小。大小在0-16之间。默认值为1,不能输入0。

      Executor Cores

      设置Executor Cores的大小。大小在0-4之间。默认值为1,不能输入0。

      Executors

      设置Executors的大小。大小在0-16之间。默认值为1,不能输入0。

    • 单击“Connect”。配置好以后,会展示DLI队列信息和集群状态“cluster status: connected”。
  3. 单击代码行,输入开发代码并进行调试。
  4. 单击代码行前面的,可以运行该行代码。
    表6 代码示例

    示例代码

    运行结果示意图

    %%spark
    spark.read.parquet('obs://mytestbucket/demo/data.parquet').show()
    图10 示例
    %%sql
    show tables
    图11 示例
    %scala
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().appName("demo").getOrCreate();
    val inputFile = "obs://mytestbucket/demo/test.txt"
    val outputDir = "obs://mytestbucket/demo/test"
    val textFile = spark.read.textFile(inputFile)
    val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
    wordCounts.write.format("csv").save(outputDir)
    wordCounts.show()
    spark.stop()
    图12 示例
    %python
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("PySpark Example").getOrCreate()
    data = [("Alice", 34), ("Bob", 45), ("Charlie", 29), ("David", 35)]
    columns = ["Name", "Age"]
    df = spark.createDataFrame(data, columns)
    df.show()
    filtered_df = df.filter(df.Age > 30)
    filtered_df.show()
    average_age = df.groupBy().avg("Age").collect()[0][0]
    print(f"Average Age: {average_age}")
    spark.stop()
    图13 示例
  5. 代码开发完毕后,进行保存并运行。
  6. 查看代码运行结果。

开发MRS (PySpark)任务

  1. 单击MRS x.x.x(PySpark),进入MRS开发界面。
  2. 在界面右上角,单击“connect”配置连接信息。(未配置前显示cluster status: init)
    • 配置Connect Name。从下拉框选择已创建的MRS Spark连接。

      MRS Spark连接仅支持管理中心创建的MRS Spark代理连接方式。创建连接的操作请参见MRS Spark数据连接参数说明

      如果当前环境未安装MRS集群客户端,界面会提示“No cluster client has been installed in the current environment. Install the client, which will take 10 to 20 minutes.”。

    • (可选)单击“Install”,安装MRS集群客户端。安装中界面显示如下图。如果已安装,请忽略。
      图14 安装
    • 安装完以后,单击“Connect”,完成连接配置。
  3. 单击代码行,输入开发代码并进行调试。
  4. 单击代码行前面的,可以运行该行代码。
    表7 代码示例

    示例代码

    运行结果示意图

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("PySpark Example").getOrCreate()
    data = [("Alice", 34), ("Bob", 45), ("Charlie", 29), ("David", 35)]
    columns = ["Name", "Age"]
    df = spark.createDataFrame(data, columns)
    df.show()
    filtered_df = df.filter(df.Age > 30)
    filtered_df.show()
    average_age = df.groupBy().avg("Age").collect()[0][0]
    print(f"Average Age: {average_age}")
    spark.stop()
    图15 示例
  5. 代码开发完毕后,进行保存并运行。
  6. 查看代码运行结果。

开发Python任务

  1. 单击python-x.x.x,进入Python开发界面。
  2. 单击代码行,输入开发代码并进行调试。
  3. 单击代码行前面的,可以运行该行代码。
  4. 代码开发完毕后,进行保存并运行。
  5. 查看代码运行结果。