Updated on 2023-10-27 GMT+08:00

Using Spark on CCE

You can use the Kubernetes scheduler spark-submit of Spark to submit Spark applications to the Kubernetes clusters. For details, see Running Spark on Kubernetes. The submission mechanism works as follows:

  • Create a pod to run the Spark driver.
  • The driver creates pods for executing the programs and establishes a connection with these pods.
  • After the application is complete, the pods that execute the programs are terminated and cleaned up, but the driver pod exists and remains in the completed state until the garbage is collected or it is manually cleaned up. In the completed state, the driver pod does not use any computing or memory resources.
Figure 1 Submission mechanism

Running SparkPi on CCE

  1. Install kubectl on the node where Spark is running. For details, see Connecting to a Cluster Using kubectl.
  2. Run the following command to grant the cluster-level permissions:

    kubectl create serviceaccount spark
    kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default

  3. Submit a SparkPi job to CCE. The following shows an example:

    spark-submit \
      --master k8s://https://**.**.**.**:5443 \
      --deploy-mode cluster \
      --name spark-pi \
      --class org.apache.spark.examples.SparkPi \
      --conf spark.executor.instances=2 \
      --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
      --conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \
      local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar

    Parameters:

    • --master: indicates the API Server of the cluster. https://**.**.**.**:5443 is the address of the master node used in ~/.kube/config. It can be obtained from kubectl cluster-info.
    • --deploy-mode:
      • cluster: a mode in which the driver is deployed on the worker nodes.
      • client: (default value) a mode in which the driver is deployed locally as an external client.
    • --name: indicates the name of a job. It is used to name the pods in the cluster.
    • --class: indicates the applications, for example, org.apache.spark.examples.SparkPi.
    • --conf: indicates the Spark's configuration parameters. It is in the key-value pair format. All parameters that can be specified using --conf are read from the ~/spark-obs/conf/spark-defaults.conf file by default. Therefore, the general configuration can be written to be the default settings, the same way as Interconnecting Spark with OBS.
      • spark.executor.instances: indicates the number of pods for executing programs.
      • spark.kubernetes.authenticate.driver.serviceAccountName: indicates the driver's cluster-level permissions. Select the service account created in 2.
      • spark.kubernetes.container.image: indicates the image path of the image pushed to SWR in Pushing an Image to SWR.
    • local: indicates the path to the JAR packages stored in the local files. In this example, a local file is used to store the JAR packages. The value of this parameter can be file, http, or local. For details, see the Official Document.

Accessing OBS

Use spark-submit to deliver an HDFS job. Change the value of obs://bucket-name/filename at the end of the script to the actual file name of the tenant.

spark-submit \
  --master k8s://https://**.**.**.**:5443 \
  --deploy-mode cluster \
  --name spark-hdfs-test \
  --class org.apache.spark.examples.HdfsTest \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \
  local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar obs://bucket-name/filename

Support for Spark Shell Commands to Interact with Spark-Scala

spark-shell \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.container.image=swr.ap-southeast-1.myhuaweicloud.com/dev-container/spark:3.1.3-obs \
  --master k8s://https://**.**.**.**:5443

Run the following commands to define the algorithms of Spark computing jobs linecount and wordcount:

def linecount(input:org.apache.spark.sql.Dataset[String]):Long=input.filter(line => line.length()>0).count()
def wordcount(input:org.apache.spark.sql.Dataset[String]):Long=input.flatMap(value => value.split("\\s+")).groupByKey(value => value).count().count()

Run the following commands to define data sources:

var alluxio = spark.read.textFile("alluxio://alluxio-master:19998/sample-1g")
var obs = spark.read.textFile("obs://gene-container-gtest/sample-1g")
var hdfs = spark.read.textFile("hdfs://192.168.1.184:9000/user/hadoop/books/sample-1g")

Run the following command to start computing jobs:

spark.time(wordcount(obs))
spark.time(linecount(obs))