Deze pagina is nog niet beschikbaar in uw eigen taal. We werken er hard aan om meer taalversies toe te voegen. Bedankt voor uw steun.

PySpark Example Code

Updated on 2024-06-13 GMT+08:00

Development Description

The CloudTable OpenTSDB and MRS OpenTSDB can be connected to DLI as data sources.

  • Prerequisites

    A datasource connection has been created on the DLI management console.

    NOTE:

    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Code implementation
    1. Import dependency packages.
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
      from pyspark.sql import SparkSession
      
    2. Create a session.
      1
      sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
      
    3. Create a table to connect to an OpenTSDB data source.
      1
      2
      3
      4
      sparkSession.sql("create table opentsdb_test using opentsdb options(
        'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
        'metric'='ct_opentsdb',
        'tags'='city,location')")
      
      NOTE:

      For details about the Host, metric, and tags parameters, see Table 1.

  • Connecting to data sources through SQL APIs
    1. Insert data.
      sparkSession.sql("insert into opentsdb_test values('aaa', 'abc', '2021-06-30 18:00:00', 30.0)")
    2. Query data.
      result = sparkSession.sql("SELECT * FROM opentsdb_test")
  • Connecting to data sources through DataFrame APIs
    1. Construct a schema.
      1
      2
      3
      4
      schema = StructType([StructField("location", StringType()),\                     
                           StructField("name", StringType()), \                   
                           StructField("timestamp", LongType()),\                  
                           StructField("value", DoubleType())])
      
    2. Set data.
      1
      dataList = sparkSession.sparkContext.parallelize([("aaa", "abc", 123456L, 30.0)])
      
    3. Create a DataFrame.
      1
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    4. Import data to OpenTSDB.
      1
      dataFrame.write.insertInto("opentsdb_test")
      
    5. Read data from OpenTSDB.
      1
      2
      3
      4
      5
      6
      7
      jdbdDF = sparkSession.read
          .format("opentsdb")\
          .option("Host","opentsdb-3xcl8dir15m58z3.cloudtable.com:4242")\
          .option("metric","ctopentsdb")\
          .option("tags","city,location")\
          .load()
      jdbdDF.show()
      
    6. View the operation result.

  • Submitting a Spark job
    1. Upload the Python code file to DLI.

    2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

      NOTE:
      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.opentsdb when you submit a job.
      • If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/opentsdb/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/opentsdb/*

      • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
      • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

  • Connecting to MRS OpenTSDB through SQL APIs
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
    
    
      # Create a DLI cross-source association opentsdb data table    
      sparkSession.sql(\
        "create table opentsdb_test using opentsdb options(\
        'Host'='10.0.0.171:4242',\
        'metric'='cts_opentsdb',\
        'tags'='city,location')")
    
      sparkSession.sql("insert into opentsdb_test values('aaa', 'abc', '2021-06-30 18:00:00', 30.0)")
    
      result = sparkSession.sql("SELECT * FROM opentsdb_test")
      result.show()
    
      # close session 
      sparkSession.stop()
  • Connecting to OpenTSDB through DataFrame APIs
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
    
      # Create a DLI cross-source association opentsdb data table    
      sparkSession.sql(
        "create table opentsdb_test using opentsdb options(\
        'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',\
        'metric'='ct_opentsdb',\
        'tags'='city,location')")
    
      # Create a DataFrame and initialize the DataFrame data.    
      dataList = sparkSession.sparkContext.parallelize([("aaa", "abc", 123456L, 30.0)])
    
      # Setting schema   
      schema = StructType([StructField("location", StringType()),\                 
                           StructField("name", StringType()),\                
                           StructField("timestamp", LongType()),\               
                           StructField("value", DoubleType())])
    
      # Create a DataFrame from RDD and schema   
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Set cross-source connection parameters   
      metric = "ctopentsdb"   
      tags = "city,location"  
      Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
    
      # Write data to the cloudtable-opentsdb   
      dataFrame.write.insertInto("opentsdb_test")
      # ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.*******   
      # dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save()   
    
      # Read data on CloudTable-OpenTSDB  
      jdbdDF = sparkSession.read\       
          .format("opentsdb")\      
          .option("Host",Host)\     
          .option("metric",metric)\  
          .option("tags",tags)\    
          .load()   
      jdbdDF.show()
    
      # close session 
      sparkSession.stop()
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback