更新时间:2024-12-28 GMT+08:00

使用Spark on CCE

使用Spark的Kubernetes调度程序spark-submit,可以将Spark应用程序提交到Kubernetes集群中运行,详情请参见在Kubernetes上运行Spark。使用spark-submit提交Spark应用程序的工作原理如下:

  • 创建一个Pod,用于运行Spark的驱动程序。
  • 驱动程序在集群中创建执行程序的Pod并与其建立连接,用于执行应用程序代码。
  • 应用程序完成后,执行程序的Pod将终止并清理,但驱动程序Pod仍然存在并保持在“已停止”状态,直到最终进行垃圾回收或手动清理。在“已停止”状态下,驱动程序Pod不会使用任何计算或内存资源。
图1 提交机制的工作原理

在CCE上运行SparkPi例子

  1. 在执行Spark的机器上安装kubectl,详情请参见通过kubectl连接集群
  2. kubectl安装成功后,执行如下命令授予集群权限。

    # 创建服务账号
    kubectl create serviceaccount spark
    # 将集群角色spark-role和上一步创建服务账号绑定,并指定default命名空间授予edit的clusterrole权限
    kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default

  3. 以提交Spark-Pi的作业到CCE为例:

    spark-submit \
      --master k8s://https://**.**.**.**:5443 \
      --deploy-mode cluster \
      --name spark-pi \
      --class org.apache.spark.examples.SparkPi \
      --conf spark.executor.instances=2 \
      --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
      --conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \
      local:///root/spark-obs/examples/jars/spark-examples_2.12-3.1.1.jar

    配置说明:

    • --master:集群的API Server,其中https://**.**.**.**:5443为 ~/.kube/config中使用的master地址,可通过kubectl cluster-info获取。
    • --deploy-mode:
      • cluster:在集群的工作节点上部署驱动程序。
      • client:(默认值)作为外部客户端在本地部署驱动程序。
    • --name:作业名称,集群中的Pod将以此开头。
    • --class:应用程序,例如org.apache.spark.examples.SparkPi。
    • --conf:Spark配置参数,使用键值格式。值得一提的是,所有能使用--conf指定的参数均会默认从文件~/spark-obs/conf/spark-defaults.conf中读取,所以通用配置可以如配置Spark对接OBS一样,直接写入作为默认值。
      • spark.executor.instances:执行程序的Pod数量。
      • spark.kubernetes.authenticate.driver.serviceAccountName:驱动程序的集群权限,选择2中创建的serviceaccount。
      • spark.kubernetes.container.image:预置镜像到SWR步骤中上传至SWR的镜像地址。
    • local:使用本地的jar包路径。本例中使用本地文件存放jar包,因此使用local类型。根据实际情况,该参数可采用多种类型(file/http/local等),详情请参见官方文档

访问对象存储服务OBS

使用spark-submit下发hdfs任务。请修改命令最后的参数为租户内实际的文件obs://bucket-name/filename。

spark-submit \
  --master k8s://https://**.**.**.**:5443 \
  --deploy-mode cluster \
  --name spark-hdfs-test \
  --class org.apache.spark.examples.HdfsTest \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \
  local:///root/spark-obs/examples/jars/spark-examples_2.12-3.1.1.jar obs://bucket-name/filename

Spark-shell交互式scala命令支持

spark-shell \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \
  --master k8s://https://**.**.**.**:5443

下述命令定义linecount及wordcount两个spark计算任务的算法。

def linecount(input:org.apache.spark.sql.Dataset[String]):Long=input.filter(line => line.length()>0).count()
def wordcount(input:org.apache.spark.sql.Dataset[String]):Long=input.flatMap(value => value.split("\\s+")).groupByKey(value => value).count().count()

下述命令定义了各种数据来源:

var alluxio = spark.read.textFile("alluxio://alluxio-master:19998/sample-1g")
var obs = spark.read.textFile("obs://gene-container-gtest/sample-1g")
var hdfs = spark.read.textFile("hdfs://192.168.1.184:9000/user/hadoop/books/sample-1g")

下述命令开始正式计算:

spark.time(wordcount(obs))
spark.time(linecount(obs))