Scala Example Code

Development description

The CloudTable OpenTSDB and MRS OpenTSDB can be connected to DLI as data sources.

  • Prerequisites

    A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.

  • Construct dependency information and create a Spark session.
    1. Import dependencies
      Involved Maven dependency
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
      Dependencies related to import
      1
      2
      3
      4
      import scala.collection.mutable
      import org.apache.spark.sql.{Row, SparkSession}
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.types._
      
    2. Create a session
      1
      val sparkSession = SparkSession.builder().getOrCreate()
      
    3. Create a table to connect to OpenTSDB datasource
      1
      2
      3
      4
      sparkSession.sql("create table opentsdb_test using opentsdb options(
      	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',	
              'metric'='ctopentsdb',
      	'tags'='city,location')")
      
      Table 1 Parameters for creating a table

      Parameter

      Description

      host

      OpenTSDB IP address.

      • To connect to the CloudTable OpenTSDB, you need to enter the IP address of the OpenTSDB. For details about how to obtain the IP address, see CloudTable OpenTSDB IP address.
      • You can also access the MRS OpenTSDB. If you have created an enhanced datasource connection, enter the IP address and port number of the node where the OpenTSDB is located. The format is IP:PORT. If the OpenTSDB has multiple nodes, enter one of the node IP addresses. For details about how to obtain the IP address, see MRS cluster OpenTSDB IP address and MRS cluster OpenTSDB port number. If you use a basic datasource connection, enter the connection address returned. For details about operations on the management console, see the Data Lake Insight User Guide.

      metric

      Name of the metric in OpenTSDB corresponding to the DLI table to be created.

      tags

      Tags corresponding to the metric, which is used for operations such as classification, filtering, and quick search. A maximum of 8 tags, including all tagk values under the metric, can be added, which are separated by commas (,).

      Figure 1 CloudTable OpenTSDB IP address
      Figure 2 MRS cluster OpenTSDB IP address
      Figure 3 MRS cluster OpenTSDB port number
  • Connecting to datasources through SQL APIs
    1. Insert data
      1
      sparkSession.sql("insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)")
      
    2. Query data
      1
      sparkSession.sql("select * from opentsdb_test").show()
      

      Response

  • Connecting to datasources through DataFrame APIs
    1. Construct a schema
      1
      2
      3
      4
      5
      val attrTag1Location = new StructField("location", StringType)
      val attrTag2Name = new StructField("name", StringType)
      val attrTimestamp = new StructField("timestamp", LongType)
      val attrValue = new StructField("value", DoubleType)
      val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp, attrValue)
      
    2. Construct data based on the schema type.
      1
      2
      val mutableRow: Seq[Any] = Seq("beijing", "abc", 123456L, 30.0)
      val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
      
    3. Import data to the OpenTSDB
      1
      sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test")
      
    4. Read data from the OpenTSDB
      1
      2
      3
      4
      5
      val map = new mutable.HashMap[String, String]()
      map("metric") = "ctopentsdb"
      map("tags") = "city,location"
      map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
      sparkSession.read.format("opentsdb").options(map.toMap).load().show()
      

      Response

  • Submitting a Spark Job
    1. Generate a JAR package based on the code and upload the package to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.opentsdb.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Maven dependency
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
  • Connecting to datasources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import org.apache.spark.sql.SparkSession
    
    object Test_OpenTSDB_CT {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        // Create a data table for DLI association OpenTSDB
        sparkSession.sql("create table opentsdb_test using opentsdb options(
    	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
    	'metric'='ctopentsdb',
    	'tags'='city,location')")
    
        //*****************************SQL module***********************************
        sparkSession.sql("insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)")
        sparkSession.sql("select * from opentsdb_test").show()
    
        sparkSession.close()
      }
    }
    
  • Connecting to datasources through DataFrame APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    import scala.collection.mutable
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types._
    
    object Test_OpenTSDB_CT {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        // Create a data table for DLI association OpenTSDB
        sparkSession.sql("create table opentsdb_test using opentsdb options(
    	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
    	'metric'='ctopentsdb',
    	'tags'='city,location')")
    
        //*****************************DataFrame model***********************************
        // Setting schema
        val attrTag1Location = new StructField("location", StringType)
        val attrTag2Name = new StructField("name", StringType)
        val attrTimestamp = new StructField("timestamp", LongType)
        val attrValue = new StructField("value", DoubleType)
        val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp,attrValue)
    
        // Populate data according to the type of schema
        val mutableRow: Seq[Any] = Seq("beijing", "abc", 123456L, 30.0)
        val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
    
        //Import the constructed data into OpenTSDB
        sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test")
    
        //Read data on OpenTSDB
        val map = new mutable.HashMap[String, String]()
        map("metric") = "ctopentsdb"
        map("tags") = "city,location"
        map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
        sparkSession.read.format("opentsdb").options(map.toMap).load().show()
    
        sparkSession.close()
      }
    }