Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with OpenTSDB (By Scala)> Detailed Development Description

Detailed Development Description

Updated at: Mar 17, 2020 GMT+08:00

The CloudTable OpenTSDB and MRS OpenTSDB can be connected to DLI as data sources.

Prerequisites

A datasource connection has been created on the DLI management console.

Construct dependency information and create a Spark session.

  1. Import dependencies
    Involved Maven dependency
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
    Dependencies related to import
    1
    2
    3
    4
    import scala.collection.mutable
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types._
    
  2. Create a session
    1
    val sparkSession = SparkSession.builder().getOrCreate()
    
  3. Create a table to connect to OpenTSDB datasource
    1
    2
    3
    4
    sparkSession.sql("create table opentsdb_test using opentsdb options(
    	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',	
            'metric'='ctopentsdb',
    	'tags'='city,location')")
    
    Table 1 Parameters for creating a table

    Parameter

    Description

    host

    OpenTSDB IP address.

    • To connect to the CloudTable OpenTSDB, you need to enter the IP address of the OpenTSDB. For details about how to obtain the IP address, see CloudTable OpenTSDB IP address.
    • You can also access the MRS OpenTSDB. If you have created an enhanced datasource connection, enter the IP address and port number of the node where the OpenTSDB is located. The format is IP:PORT. If the OpenTSDB has multiple nodes, enter one of the node IP addresses. For details about how to obtain the IP address, see MRS cluster OpenTSDB IP address and MRS cluster OpenTSDB port number. If you use a basic datasource connection, enter the connection address returned. For details about operations on the management console, see Classic Datasource Connections in the Data Lake Insight User Guide.

    metric

    Name of the metric in OpenTSDB corresponding to the DLI table to be created.

    tags

    Tags corresponding to the metric, which is used for operations such as classification, filtering, and quick search. A maximum of 8 tags, including all tagk values under the metric, can be added, which are separated by commas (,).

    Figure 1 CloudTable OpenTSDB IP address
    Figure 2 MRS cluster OpenTSDB IP address
    Figure 3 MRS cluster OpenTSDB port number

Connecting to Datasources Through SQL APIs

  1. Insert data
    1
    sparkSession.sql("insert into opentsdb_test values('futian', 'huawei', '1970-01-02 18:17:36', 30.0)")
    
  2. Query data
    1
    sparkSession.sql("select * from opentsdb_test").show()
    

    Response

Connecting to Datasources Through DataFrame APIs

  1. Construct a schema
    1
    2
    3
    4
    5
    val attrTag1Location = new StructField("location", StringType)
    val attrTag2Name = new StructField("name", StringType)
    val attrTimestamp = new StructField("timestamp", LongType)
    val attrValue = new StructField("value", DoubleType)
    val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp, attrValue)
    
  2. Construct data based on the schema type.
    1
    2
    val mutableRow: Seq[Any] = Seq("beijing", "huawei", 123456L, 30.0)
    val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
    
  3. Import data to the OpenTSDB
    1
    sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test")
    
  4. Read data from the OpenTSDB
    1
    2
    3
    4
    5
    val map = new mutable.HashMap[String, String]()
    map("metric") = "ctopentsdb"
    map("tags") = "city,location"
    map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
    sparkSession.read.format("opentsdb").options(map.toMap).load().show()
    

    Response

Submitting a Spark Job

  1. Generate a JAR package based on the code and upload the package to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
  2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Session (Recommended) and Creating a Batch Processing Job in the Data Lake Insight API Reference.
    • When submitting a job, you need to specify a dependency module named sys.datasource.opentsdb.
    • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
    • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Session and Creating a Batch Processing Job in the Data Lake Insight API Reference.

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel