Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with OpenTSDB (By Scala)> Complete Example Code

Complete Example Code

Updated at: Mar 17, 2020 GMT+08:00

Maven Dependency

1
2
3
4
5
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.3.2</version>
</dependency>

Connecting to Datasources Through SQL APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.spark.sql.SparkSession

object Test_OpenTSDB_CT {
  def main(args: Array[String]): Unit = {
    // Create a SparkSession session.
    val sparkSession = SparkSession.builder().getOrCreate()

    // Create a data table for DLI association OpenTSDB
    sparkSession.sql("create table opentsdb_test using opentsdb options(
	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
	'metric'='ctopentsdb',
	'tags'='city,location')")

    //*****************************SQL module***********************************
    sparkSession.sql("insert into opentsdb_test values('futian', 'huawei', '1970-01-02 18:17:36', 30.0)")
    sparkSession.sql("select * from opentsdb_test").show()

    sparkSession.close()
  }
}

Connecting to Datasources Through DataFrame APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import scala.collection.mutable
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._

object Test_OpenTSDB_CT {
  def main(args: Array[String]): Unit = {
    // Create a SparkSession session.
    val sparkSession = SparkSession.builder().getOrCreate()

    // Create a data table for DLI association OpenTSDB
    sparkSession.sql("create table opentsdb_test using opentsdb options(
	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
	'metric'='ctopentsdb',
	'tags'='city,location')")

    //*****************************DataFrame model***********************************
    // Setting schema
    val attrTag1Location = new StructField("location", StringType)
    val attrTag2Name = new StructField("name", StringType)
    val attrTimestamp = new StructField("timestamp", LongType)
    val attrValue = new StructField("value", DoubleType)
    val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp,attrValue)

    // Populate data according to the type of schema
    val mutableRow: Seq[Any] = Seq("beijing", "huawei", 123456L, 30.0)
    val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)

    //Import the constructed data into OpenTSDB
    sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test")

    //Read data on OpenTSDB
    val map = new mutable.HashMap[String, String]()
    map("metric") = "ctopentsdb"
    map("tags") = "city,location"
    map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
    sparkSession.read.format("opentsdb").options(map.toMap).load().show()

    sparkSession.close()
  }
}

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel