更新时间:2024-12-28 GMT+08:00
使用Spark on CCE
使用Spark的Kubernetes调度程序spark-submit,可以将Spark应用程序提交到Kubernetes集群中运行,详情请参见在Kubernetes上运行Spark。使用spark-submit提交Spark应用程序的工作原理如下:
- 创建一个Pod,用于运行Spark的驱动程序。
- 驱动程序在集群中创建执行程序的Pod并与其建立连接,用于执行应用程序代码。
- 应用程序完成后,执行程序的Pod将终止并清理,但驱动程序Pod仍然存在并保持在“已停止”状态,直到最终进行垃圾回收或手动清理。在“已停止”状态下,驱动程序Pod不会使用任何计算或内存资源。
图1 提交机制的工作原理
在CCE上运行SparkPi例子
- 在执行Spark的机器上安装kubectl,详情请参见通过kubectl连接集群。
- kubectl安装成功后,执行如下命令授予集群权限。
# 创建服务账号 kubectl create serviceaccount spark # 将集群角色spark-role和上一步创建服务账号绑定,并指定default命名空间授予edit的clusterrole权限 kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
- 以提交Spark-Pi的作业到CCE为例:
spark-submit \ --master k8s://https://**.**.**.**:5443 \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \ local:///root/spark-obs/examples/jars/spark-examples_2.12-3.1.1.jar
配置说明:
- --master:集群的API Server,其中https://**.**.**.**:5443为 ~/.kube/config中使用的master地址,可通过kubectl cluster-info获取。
- --deploy-mode:
- cluster:在集群的工作节点上部署驱动程序。
- client:(默认值)作为外部客户端在本地部署驱动程序。
- --name:作业名称,集群中的Pod将以此开头。
- --class:应用程序,例如org.apache.spark.examples.SparkPi。
- --conf:Spark配置参数,使用键值格式。值得一提的是,所有能使用--conf指定的参数均会默认从文件~/spark-obs/conf/spark-defaults.conf中读取,所以通用配置可以如配置Spark对接OBS一样,直接写入作为默认值。
- local:使用本地的jar包路径。本例中使用本地文件存放jar包,因此使用local类型。根据实际情况,该参数可采用多种类型(file/http/local等),详情请参见官方文档。
访问对象存储服务OBS
使用spark-submit下发hdfs任务。请修改命令最后的参数为租户内实际的文件obs://bucket-name/filename。
spark-submit \
--master k8s://https://**.**.**.**:5443 \
--deploy-mode cluster \
--name spark-hdfs-test \
--class org.apache.spark.examples.HdfsTest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \
local:///root/spark-obs/examples/jars/spark-examples_2.12-3.1.1.jar obs://bucket-name/filename
Spark-shell交互式scala命令支持
spark-shell \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \ --master k8s://https://**.**.**.**:5443
下述命令定义linecount及wordcount两个spark计算任务的算法。
def linecount(input:org.apache.spark.sql.Dataset[String]):Long=input.filter(line => line.length()>0).count() def wordcount(input:org.apache.spark.sql.Dataset[String]):Long=input.flatMap(value => value.split("\\s+")).groupByKey(value => value).count().count()
下述命令定义了各种数据来源:
var alluxio = spark.read.textFile("alluxio://alluxio-master:19998/sample-1g") var obs = spark.read.textFile("obs://gene-container-gtest/sample-1g") var hdfs = spark.read.textFile("hdfs://192.168.1.184:9000/user/hadoop/books/sample-1g")
下述命令开始正式计算:
spark.time(wordcount(obs)) spark.time(linecount(obs))