PySpark Example Code

Development description

The CloudTable OpenTSDB and MRS OpenTSDB can be connected to DLI as data sources.

  • Prerequisites

    A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.

  • Code implementation
    1. Dependencies related to import
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
      from pyspark.sql import SparkSession
      
    2. Create a session
      1
      sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
      
    3. Create a table to connect to OpenTSDB datasource
      1
      2
      3
      4
      sparkSession.sql("create table opentsdb_test using opentsdb options(
        'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
        'metric'='ct_opentsdb',
        'tags'='city,location')")
      

      For details about the Host, Metric, and tags parameters, see Table 1.

  • Connecting to datasources through DataFrame APIs
    1. Construct a schema
      1
      2
      3
      4
      schema = StructType([StructField("location", StringType()),                     
                           StructField("name", StringType()),                    
                           StructField("timestamp", LongType()),                  
                           StructField("value", DoubleType())])
      
    2. Configure data
      1
      dataList = sparkSession.sparkContext.parallelize([("beijing", "abc", 123456L, 30.0)])
      
    3. Create a DataFrame
      1
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    4. Import data to the OpenTSDB
      1
      dataFrame.write.insertInto("opentsdb_test")
      
    5. Read data from the OpenTSDB
      1
      2
      3
      4
      5
      6
      7
      jdbdDF = sparkSession.read
          .format("opentsdb")\
          .option("Host","opentsdb-3xcl8dir15m58z3.cloudtable.com:4242")\
          .option("metric","ctopentsdb")\
          .option("tags","city,location")\
          .load()
      jdbdDF.show()
      
    6. Operation result

  • Submitting a Spark Job
    1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.opentsdb.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
from pyspark.sql import SparkSession

if __name__ == "__main__":
  # Create a SparkSession session.    
  sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()

  # Create a DLI cross-source association opentsdb data table    
  sparkSession.sql("create table opentsdb_test using opentsdb options(
    'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
    'metric'='ct_opentsdb',
    'tags'='city,location')")

  # Create a DataFrame and initialize the DataFrame data.    
  dataList = sparkSession.sparkContext.parallelize([("beijing", "abc", 123456L, 30.0)])

  # Setting schema   
  schema = StructType([StructField("location", StringType()),                  
                       StructField("name", StringType()),                  
                       StructField("timestamp", LongType()),               
                       StructField("value", DoubleType())])

  # Create a DataFrame from RDD and schema   
  dataFrame = sparkSession.createDataFrame(dataList, schema)

  # Set cross-source connection parameters   
  metric = "ctopentsdb"   
  tags = "city,location"  
  Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"

  # Write data to the cloudtable-opentsdb   
  dataFrame.write.insertInto("opentsdb_test")
  # ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.*******   
  # dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save()   

  # Read data on CloudTable-OpenTSDB  
  jdbdDF = sparkSession.read\       
      .format("opentsdb")\      
      .option("Host",Host)\     
      .option("metric",metric)\  
      .option("tags",tags)\    
      .load()   
  jdbdDF.show()

  # close session 
  sparkSession.stop()