Scala Example Code

Development description

The CloudTable HBase and MRS HBase can be connected to DLI as data sources.

  • Prerequisites

    A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.

  • Construct dependency information and create a Spark session.
    1. Import dependencies
      Involved Maven dependency
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
      Dependencies related to import
      1
      2
      3
      4
      import scala.collection.mutable
      import org.apache.spark.sql.{Row, SparkSession}
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.types._
      
    2. Create a session
      1
      val sparkSession = SparkSession.builder().getOrCreate()
      
    3. Create a table to connect to HBase datasource
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 
              'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
      	'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                        cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                        cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
      	'TableName'='table_DupRowkey1',
      	'RowKey'='id:5,location:6,city:7',        'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')",
              'krb5conf'='$krb5conf',
              'keytab' = '$keytab',
              'principal' = '$principal'
      )
      
      Table 1 Parameters for creating a table

      Parameter

      Description

      ZKHost

      ZooKeeper IP address of the HBase cluster.

      You need to create a datasource connection first. For details, see Data Lake Insight User Guide.

      • To connect to a CloudTable cluster, enter the ZooKeeper IP address (internal network).
      • To connect to an MRS cluster, enter the IP address of the node where the ZooKeeper is located and the Zookeeper external port number. The format is ZK_IP1:ZK_PORT1,ZK_IP2:ZK_PORT2.
      NOTE:

      To connect to an MRS cluster, you can create an enhanced datasource connection and configure host information. For details about operations on the management console, see Enhanced Datasource Connections in the Data Lake Insight User Guide. For related API reference, see Creating an Enhanced Datasource Connection.

      RowKey

      Specifies the row key field of the table connected to DLI. The single and composite row keys are supported. A single row key can be of the numeric or string type. The length does not need to be specified. The composite row key supports only fixed-length data of the string type. The format is attribute name 1:Length, attribute name 2:length.

      Cols

      Specifies the mapping between the fields in the DLI table and the CloudTable table. In the preceding information, the DLI table field is placed before the colon (:) and the CloudTable table field is placed after the colon (:). The dot (.) is used to separate the column family and column name of the CloudTable table.

      For example: DLI table field 1:CloudTable table.CloudTable table field 1, DLI table field 2:CloudTable table.CloudTable table field 2, DLI table field 3:CLoudTable table.CloudTable table field 3

      krb5conf

      OBS path of the krb5.conf file. The format is obs://ak:sk@bucket/path.

      keytab

      OBS path of the keytab file. The format is obs://ak:sk@bucket/path.

      principal

      User principal for Kerberos machine-machine authentication

  • Connecting to datasources through SQL APIs
    1. Insert data
      1
      sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)")
      
    2. Query data
      1
      sparkSession.sql("select * from test_hbase").show ()
      

      Response:

  • Connecting to datasources through DataFrame APIs
    1. Construct a schema
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      val attrId = new StructField("id",StringType)
      val location = new StructField("location",StringType)
      val city = new StructField("city",StringType)
      val booleanf = new StructField("booleanf",BooleanType)
      val shortf = new StructField("shortf",ShortType)
      val intf = new StructField("intf",IntegerType)
      val longf = new StructField("longf",LongType)
      val floatf = new StructField("floatf",FloatType)
      val doublef = new StructField("doublef",DoubleType)
      val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)
      
    2. //Construct data based on the schema type.
      1
      2
      val mutableRow: Seq[Any] = Seq("12345","abc","guiyang",false,null,3,23,2.3,2.34)
      val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
      
    3. Import data to HBase
      1
      sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")
      
    4. Read data from HBase
      1
      2
      3
      4
      5
      6
      7
      8
      val map = new mutable.HashMap[String, String]()
      map("TableName") = "table_DupRowkey1"
      map("RowKey") = "id:5,location:6,city:7"
      map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
      map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                     cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                     cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
      sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show()
      

      Response:

  • Submitting a Spark Job
    1. Generate a JAR package based on the code and upload the package to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.hbase.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Maven dependency
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
  • Connecting to datasources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import org.apache.spark.sql.SparkSession
    
    object Test_SparkSql_HBase {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        /**
         * Create an association table for the DLI association Hbase table
         */
        sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 
            'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
    	'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
    	          cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
    	          cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
    	'TableName'='table_DupRowkey1',
    	'RowKey'='id:5,location:6,city:7',
    	'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,
    		longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
    
        //*****************************SQL model***********************************
        sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)")
        sparkSession.sql("select * from test_hbase").collect()
    
        sparkSession.close()
      }
    }
    
  • Connecting to datasources through DataFrame APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    import scala.collection.mutable
    
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types._
    
    object Test_SparkSql_HBase {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        // Create an association table for the DLI association Hbase table
        sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 
            'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
    	'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
    	          cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
    	          cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
    	'TableName'='table_DupRowkey1',
    	'RowKey'='id:5,location:6,city:7',
    	'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
    
        //*****************************DataFrame model***********************************
        // Setting schema
        val attrId = new StructField("id",StringType)
        val location = new StructField("location",StringType)
        val city = new StructField("city",StringType)
        val booleanf = new StructField("booleanf",BooleanType)
        val shortf = new StructField("shortf",ShortType)
        val intf = new StructField("intf",IntegerType)
        val longf = new StructField("longf",LongType)
        val floatf = new StructField("floatf",FloatType)
        val doublef = new StructField("doublef",DoubleType)
        val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)
    
        // Populate data according to the type of schema
        val mutableRow: Seq[Any] = Seq("12345","abc","guiyang",false,null,3,23,2.3,2.34)
        val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
    
        // Import the constructed data into Hbase
        sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")
    
        // Read data on Hbase
        val map = new mutable.HashMap[String, String]()
        map("TableName") = "table_DupRowkey1"
        map("RowKey") = "id:5,location:6,city:7"
        map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
        map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                       cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                       cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
        sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().collect()
    
        sparkSession.close()
      }
    }