Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with OpenTSDB (By PySpark)> Complete Example Code

Complete Example Code

Updated at: Mar 17, 2020 GMT+08:00
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
from pyspark.sql import SparkSession

if __name__ == "__main__":
  # Create a SparkSession session.    
  sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()

  # Create a DLI cross-source association opentsdb data table    
  sparkSession.sql("create table opentsdb_test using opentsdb options(
    'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
    'metric'='ct_opentsdb',
    'tags'='city,location')")

  # Create a DataFrame and initialize the DataFrame data.    
  dataList = sparkSession.sparkContext.parallelize([("beijing", "huawei", 123456L, 30.0)])

  # Setting schema   
  schema = StructType([StructField("location", StringType()),                  
                       StructField("name", StringType()),                  
                       StructField("timestamp", LongType()),               
                       StructField("value", DoubleType())])

  # Create a DataFrame from RDD and schema   
  dataFrame = sparkSession.createDataFrame(dataList, schema)

  # Set cross-source connection parameters   
  metric = "ctopentsdb"   
  tags = "city,location"  
  Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"

  # Write data to the cloudtable-hbase   
  dataFrame.write.insertInto("opentsdb_test")
  # ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.*******   
  # dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save()   

  # Read data on CloudTable-HBase  
  jdbdDF = sparkSession.read\       
      .format("opentsdb")\      
      .option("Host",Host)\     
      .option("metric",metric)\  
      .option("tags",tags)\    
      .load()   
  jdbdDF.show()

  # close session 
  sparkSession.stop()

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel