Updated on 2023-07-13 GMT+08:00

Spark Application Development

Spark is a distributed batch processing framework. It provides analysis and mining and iterative memory computation capabilities and supports application development in multiple programming languages. It applies to the following scenarios:
  • Data processing: Spark can process data quickly and has fault tolerance and scalability.
  • Iterative computation: Spark supports iterative computation to keep up with the multi-step data processing logic.
  • Data mining: Based on massive data, Spark can handle complex data mining and analysis and supports multiple data mining and machine learning algorithms.
  • Streaming processing: Spark supports streaming processing with only a seconds-level latency and supports multiple external data sources.
  • Query analysis: Spark supports standard SQL query analysis, provides the DSL (DataFrame), and supports multiple external inputs.

MRS provides sample application development projects based on Spark. This practice provides guidance for you to obtain and import a sample project after creating an MRS cluster and then conduct building and commissioning locally. In this sample project, you can read data from Hive tables and re-write the data to HBase tables.

The guidelines for the sample project in this practice are as follows:

  1. Query data in a specified Hive table.
  2. Query data in a specified HBase table based on the key of the data in the Hive table.
  3. Add related data records and write them to the HBase table.

Creating an MRS Cluster

  1. Create and purchase an MRS cluster that contains Spark. For details, see Buying a Custom Cluster.

    In this practice, an MRS 3.1.5 cluster, with Spark2x, Hive, and HBase installed and with Kerberos authentication enabled, is used as an example.

  2. After the cluster is purchased, install the client on any node in the cluster. For details, see Installing and Using the Cluster Client.

    For example, install the client in the /opt/client directory on the active management node.

Preparing the Cluster Configuration File

  1. After the cluster is created, log in to FusionInsight Manager and create a cluster user for submitting Flink jobs.

    Choose System > Permission > User. In the right pane, click Create. On the displayed page, create a human-machine user, for example, sparkuser.

    Add the supergroup user group and associate with the System_administrator role.

  2. Log in to FusionInsight Manager as the new user and change the initial password as prompted.
  3. Choose System > Permission > User. In the Operation column of sparkuser, choose More > Download Authentication Credential. Save the file and decompress it to obtain the user.keytab and krb5.conf files of the user.

Developing the Application

  1. Obtain the sample project from Huawei Mirrors.

    Download the Maven project source code and configuration files of the sample project, and configure related development tools on your local PC. For details, see Obtaining Sample Projects from Huawei Mirrors.

    Select a branch based on the cluster version and download the required MRS sample project.

    For example, the sample project suitable for this practice is SparkHivetoHbase, which can be obtained at https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.5/src/spark-examples/sparksecurity-examples/SparkHivetoHbaseJavaExample.

  2. Use IDEA to import the sample project and wait for the Maven project to download the dependency packages.

    After you configure Maven and SDK parameters on the local PC, the sample project automatically loads related dependency packages. For details, see Configuring and Importing Sample Projects.

    Figure 1 Spark Hive to HBase sample project

    The SparkHivetoHbase class in the sample project uses Spark to call Hive APIs to operate a Hive table, obtain the corresponding record from an HBase table based on the key, perform operations on the two data records, and update the data to the HBase table.

    The code snippet is as follows:

    ...
    public class SparkHivetoHbase {
        public static void main(String[] args) throws Exception {
            String userPrincipal = "sparkuser";     //Specifies the cluster user information and keytab file address used for authentication.
            String userKeytabPath = "/opt/client/user.keytab";
            String krb5ConfPath = "/opt/client/krb5.conf";
            Configuration hadoopConf = new Configuration();
            LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
            //Calls the Spark API to obtain table data.
            SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc);
            Dataset<Row> dataFrame = sqlContext.sql("select name, account from person");
            //Traverses partitions in the Hive table and updates the partitions to the HBase table.
                    dataFrame
                    .toJavaRDD()
                    .foreachPartition(
                            new VoidFunction<Iterator<Row>>() {
                                public void call(Iterator<Row> iterator) throws Exception {
                                    hBaseWriter(iterator);
                                }
                            });
            jsc.stop();
        }
        //Updates records in the HBase table on the executor.
        private static void hBaseWriter(Iterator<Row> iterator) throws IOException {
            //Reads the HBase table.
            String tableName = "table2";
            String columnFamily = "cf";
            Configuration conf = HBaseConfiguration.create();
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            try {
                connection = ConnectionFactory.createConnection(conf);
                table = connection.getTable(TableName.valueOf(tableName));
                List<Row> table1List = new ArrayList<Row>();
                List<Get> rowList = new ArrayList<Get>();
                while (iterator.hasNext()) {
                    Row item = iterator.next();
                    Get get = new Get(item.getString(0).getBytes());
                    table1List.add(item);
                    rowList.add(get);
                }
                //Obtains the records in the HBase table.
                Result[] resultDataBuffer = table.get(rowList);
                //Modifies records in the HBase table.
                List<Put> putList = new ArrayList<Put>();
                for (int i = 0; i < resultDataBuffer.length; i++) {
                    Result resultData = resultDataBuffer[i];
                    if (!resultData.isEmpty()) {
                        int hiveValue = table1List.get(i).getInt(1);
                        String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
                        Put put = new Put(table1List.get(i).getString(0).getBytes());
                        //Calculates the result.
                        int resultValue = hiveValue + Integer.valueOf(hbaseValue);
                        put.addColumn(
                                Bytes.toBytes(columnFamily),
                                Bytes.toBytes("cid"),
                                Bytes.toBytes(String.valueOf(resultValue)));
                        putList.add(put);
                    }
                }
                if (putList.size() > 0) {
                    table.put(putList);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (table != null) {
                    try {
                        table.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        //Closes the HBase connection.
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    ...

    For an MRS cluster with Kerberos authentication enabled, the application needs to perform user authentication on the server. In this sample project, configure authentication information in code. Set userPrincipal to the username for authentication and change userKeytabPath and krb5ConfPath to the actual file paths on the client server.

  3. After confirming that the parameters in the project are correct, build the project and package it into a JAR file.

    In the Maven window, select clean from Lifecycle to execute the Maven building process. Select package and obtain the JAR package from the target directory.

    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  02:36 min
    [INFO] Finished at: 2023-06-12T20:46:24+08:00
    [INFO] ------------------------------------------------------------------------

    For example, the JAR file is SparkHivetoHbase-1.0.jar.

Uploading the JAR Package and Preparing Source Data

  1. Upload the JAR package to a directory, for example, /opt/client/sparkdemo, on the client node.

    If you cannot directly access the client node to upload files through the local network, upload the JAR package or source data to OBS, import it to HDFS on the Files tab of the MRS console, and run the hdfs dfs -get command on the HDFS client to download it to the client node.

  2. Upload the keytab file used for authentication to the specified location in the code, for example, /opt/client.
  3. Log in to the node where the cluster client is installed as user root.

    cd /opt/client

    source bigdata_env

    kinit sparkuser

  4. Create a Hive table and insert data into the table.

    beeline

    In the Hive Beeline, run the following commands to create a table and insert data:

    create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE;

    insert into table person(name,account) values("1","100");

    select * from person;

    +--------------+-----------------+
    | person.name  | person.account  |
    +--------------+-----------------+
    | 1            | 100             |
    +--------------+-----------------+

  5. Create an HBase table and insert data into the table.

    Exit the Hive Beeline, run the spark-beeline command, and run the following command to create an HBase table:

    create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid" );

    Exit the Spark Beeline, run the hbase shell command to go to the HBase Shell, and run the following commands to insert data:

    put 'table2', '1', 'cf:cid', '1000'

    scan 'table2'

    ROW                                                 COLUMN+CELL                                                                                                                                           
     1                                                 column=cf:cid, timestamp=2023-06-12T21:12:50.711, value=1000                                                                                           
    1 row(s)

Running the Application and Viewing the Result

  1. On the node where the cluster client is installed, run the following commands to execute the JAR package exported from the sample project:

    cd /opt/client

    source bigdata_env

    cd Spark2x/spark

    vi conf/spark-defaults.conf

    Change the value of spark.yarn.security.credentials.hbase.enabled to true.

    bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/client/sparkdemo/SparkHivetoHbase-1.0.jar

  2. After the task is submitted, log in to FusionInsight Manager as user sparkuser, choose Cluster > Services > Yarn, and link to the ResourceManager web UI. Then locate the Spark application job information and click ApplicationMaster in the last column of the application information to go to the Spark UI and view details.

    Figure 2 Viewing Spark task details

  3. After the task is complete, query the HBase table content in the HBase shell. You can see that the records have been updated.

    cd /opt/client

    source bigdata_env

    hbase shell

    scan 'table2'

    ROW                                                 COLUMN+CELL                                                                                                                                           
     1                                                 column=cf:cid, timestamp=2023-06-12T21:22:50.711, value=1100                                                                                           
    1 row(s)