开发Launcher任务
本章节详细介绍使用Notebook开发Launcher任务的操作指导。
前提条件
已启用Notebook并且已创建出Notebook实例。创建Notebook实例的操作请参见创建Notebook实例。
约束限制
- 目前仅支持DLI、MRS两种数据源。
- MRS Spark连接仅支持管理中心创建的MRS Spark代理连接方式。
Launcher开发入口
- 在数据开发主界面的左侧导航栏,选择“数据开发 > Notebook”。
- 选择状态为“运行中”的Notebook实例。
- 单击“打开”按钮进入Notebook的开发界面。
- 单击
,选择Launcher,进入Launcher开发界面。目前仅支持DLI Spark x.x.x(PySpark)、DLI Spark x.x.x(Scale)、MRS x.x.x(PySpark)、python-x.x.x四种语言任务开发。
x.x.x表示版本号。
图1 选择Launcher
开发DLI Spark(PySpark)任务


同时,在“权限管理 > 权限”里面配置如下自定义策略:

{ "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "iam:agencies:pass" ] } ] }
- 单击DLI Spark x.x.x(PySpark),进入DLI Spark(PySpark)开发界面。
- 在界面右上角,单击“connect”配置连接信息。
- 配置连接基本参数Pool和Queue(DLI的资源池和队列名称)。
- 配置高级参数(Advanced Settings)。
表1 Spark Config 参数
描述
--conf
填写DLI任务运行的参数。例如配置DLI任务的代理名称。
spark.dli.job.agency.name=notebook_agency(必填)
其他参数根据实际业务需要进行配置。
--jars
填写文件的OBS路径,多个路径以Enter键分隔。(可选)
--py-files
填写文件的OBS路径,多个路径以Enter键分隔。(可选)
--files
填写文件的OBS路径,多个路径以Enter键分隔。(可选)
表2 Resource Config 参数
描述
Driver Memory
设置Driver Memory的大小。大小在0-16之间。默认值为1,不能输入0。
Driver Cores
设置Driver Cores的大小。大小在0-4之间。默认值为1,不能输入0。
Executor Memory
设置Executor Memory的大小。大小在0-16之间。默认值为1,不能输入0。
Executor Cores
设置Executor Cores的大小。大小在0-4之间。默认值为1,不能输入0。
Executors
设置Executors的大小。大小在0-16之间。默认值为1,不能输入0。
- 单击“Connect”。配置好以后,会展示DLI队列信息和集群状态“cluster status: connected”。
- 单击代码行,输入开发代码并进行调试。
- 单击代码行前面的
,可以运行该行代码。
表3 代码示例 示例代码
运行结果示意图
%%spark spark.read.parquet('obs://mytestbucket/demo/data.parquet').show()
图4 示例%%sql show tables
图5 示例%scala import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().appName("demo").getOrCreate(); val inputFile = "obs://mytestbucket/demo/test.txt" val outputDir = "obs://mytestbucket/demo/test" val textFile = spark.read.textFile(inputFile) val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts.write.format("csv").save(outputDir) wordCounts.show() spark.stop()
图6 示例%python from pyspark.sql import SparkSession spark = SparkSession.builder.appName("PySpark Example").getOrCreate() data = [("Alice", 34), ("Bob", 45), ("Charlie", 29), ("David", 35)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns) df.show() filtered_df = df.filter(df.Age > 30) filtered_df.show() average_age = df.groupBy().avg("Age").collect()[0][0] print(f"Average Age: {average_age}") spark.stop()
图7 示例 - 代码开发完毕后,进行保存并运行。
- 查看代码运行结果。
开发DLI Spark(Scala)任务


同时,在“权限管理 > 权限”里面配置如下自定义策略:

{ "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "iam:agencies:pass" ] } ] }
- 单击DLI Spark x.x.x(Scale),进入DLI Spark(Scale)开发界面。
- 在界面右上角,单击“connect”配置连接信息。
- 配置连接基本参数Pool和Queue(DLI的资源池和队列名称)。
- 配置高级参数(Advanced Settings)。
表4 Spark Config 参数
描述
--conf
填写DLI任务运行的参数。例如配置DLI任务的代理名称。
spark.dli.job.agency.name=notebook_agency(必填)
其他参数根据实际业务需要进行配置。
--jars
填写文件的OBS路径,多个路径以Enter键分隔。(可选)
--py-files
填写文件的OBS路径,多个路径以Enter键分隔。(可选)
--files
填写文件的OBS路径,多个路径以Enter键分隔。(可选)
表5 Resource Config 参数
描述
Driver Memory
设置Driver Memory的大小。大小在0-16之间。默认值为1,不能输入0。
Driver Cores
设置Driver Cores的大小。大小在0-4之间。默认值为1,不能输入0。
Executor Memory
设置Executor Memory的大小。大小在0-16之间。默认值为1,不能输入0。
Executor Cores
设置Executor Cores的大小。大小在0-4之间。默认值为1,不能输入0。
Executors
设置Executors的大小。大小在0-16之间。默认值为1,不能输入0。
- 单击“Connect”。配置好以后,会展示DLI队列信息和集群状态“cluster status: connected”。
- 单击代码行,输入开发代码并进行调试。
- 单击代码行前面的
,可以运行该行代码。
表6 代码示例 示例代码
运行结果示意图
%%spark spark.read.parquet('obs://mytestbucket/demo/data.parquet').show()
图10 示例%%sql show tables
图11 示例%scala import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().appName("demo").getOrCreate(); val inputFile = "obs://mytestbucket/demo/test.txt" val outputDir = "obs://mytestbucket/demo/test" val textFile = spark.read.textFile(inputFile) val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts.write.format("csv").save(outputDir) wordCounts.show() spark.stop()
图12 示例%python from pyspark.sql import SparkSession spark = SparkSession.builder.appName("PySpark Example").getOrCreate() data = [("Alice", 34), ("Bob", 45), ("Charlie", 29), ("David", 35)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns) df.show() filtered_df = df.filter(df.Age > 30) filtered_df.show() average_age = df.groupBy().avg("Age").collect()[0][0] print(f"Average Age: {average_age}") spark.stop()
图13 示例 - 代码开发完毕后,进行保存并运行。
- 查看代码运行结果。
开发MRS (PySpark)任务
- 单击MRS x.x.x(PySpark),进入MRS开发界面。
- 在界面右上角,单击“connect”配置连接信息。(未配置前显示cluster status: init)
- 配置Connect Name。从下拉框选择已创建的MRS Spark连接。
MRS Spark连接仅支持管理中心创建的MRS Spark代理连接方式。创建连接的操作请参见MRS Spark数据连接参数说明。
如果当前环境未安装MRS集群客户端,界面会提示“No cluster client has been installed in the current environment. Install the client, which will take 10 to 20 minutes.”。
- (可选)单击“Install”,安装MRS集群客户端。安装中界面显示如下图。如果已安装,请忽略。
图14 安装
- 安装完以后,单击“Connect”,完成连接配置。
- 配置Connect Name。从下拉框选择已创建的MRS Spark连接。
- 单击代码行,输入开发代码并进行调试。
- 单击代码行前面的
,可以运行该行代码。
表7 代码示例 示例代码
运行结果示意图
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("PySpark Example").getOrCreate() data = [("Alice", 34), ("Bob", 45), ("Charlie", 29), ("David", 35)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns) df.show() filtered_df = df.filter(df.Age > 30) filtered_df.show() average_age = df.groupBy().avg("Age").collect()[0][0] print(f"Average Age: {average_age}") spark.stop()
图15 示例 - 代码开发完毕后,进行保存并运行。
- 查看代码运行结果。
开发Python任务
- 单击python-x.x.x,进入Python开发界面。
- 单击代码行,输入开发代码并进行调试。
- 单击代码行前面的
,可以运行该行代码。
- 代码开发完毕后,进行保存并运行。
- 查看代码运行结果。