PySpark Example Code
Development Description
The CloudTable OpenTSDB and MRS OpenTSDB can be connected to DLI as data sources.
- Prerequisites
A datasource connection has been created on the DLI management console. For details, see Enhanced Datasource Connections.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
- Code implementation
- Import dependency packages.
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession
- Create a session.
1
sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
- Create a table to connect to an OpenTSDB data source.
1 2 3 4
sparkSession.sql("create table opentsdb_test using opentsdb options( 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ct_opentsdb', 'tags'='city,location')")
For details about the Host, metric, and tags parameters, see Table 1.
- Import dependency packages.
- Connecting to data sources through SQL APIs
- Insert data.
sparkSession.sql("insert into opentsdb_test values('aaa', 'abc', '2021-06-30 18:00:00', 30.0)")
- Query data.
result = sparkSession.sql("SELECT * FROM opentsdb_test")
- Insert data.
- Connecting to data sources through DataFrame APIs
- Construct a schema.
1 2 3 4
schema = StructType([StructField("location", StringType()),\ StructField("name", StringType()), \ StructField("timestamp", LongType()),\ StructField("value", DoubleType())])
- Set data.
1
dataList = sparkSession.sparkContext.parallelize([("aaa", "abc", 123456L, 30.0)])
- Create a DataFrame.
1
dataFrame = sparkSession.createDataFrame(dataList, schema)
- Import data to OpenTSDB.
1
dataFrame.write.insertInto("opentsdb_test")
- Read data from OpenTSDB.
1 2 3 4 5 6 7
jdbdDF = sparkSession.read .format("opentsdb")\ .option("Host","opentsdb-3xcl8dir15m58z3.cloudtable.com:4242")\ .option("metric","ctopentsdb")\ .option("tags","city,location")\ .load() jdbdDF.show()
- View the operation result.
- Construct a schema.
- Submitting a Spark job
- Upload the Python code file to DLI.
For details about console operations, see Creating a Package. For details about API operations, see Uploading a Package Group.
- In the Spark job editor, select the corresponding dependency module and execute the Spark job.
For details about console operations, see Creating a Spark Job. For details about API operations, see Creating a Batch Processing Job.
- If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.opentsdb when you submit a job.
- If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/opentsdb/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/opentsdb/*
- For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
- For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.
- Upload the Python code file to DLI.
Complete Example Code
- Connecting to MRS OpenTSDB through SQL APIs
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table sparkSession.sql(\ "create table opentsdb_test using opentsdb options(\ 'Host'='10.0.0.171:4242',\ 'metric'='cts_opentsdb',\ 'tags'='city,location')") sparkSession.sql("insert into opentsdb_test values('aaa', 'abc', '2021-06-30 18:00:00', 30.0)") result = sparkSession.sql("SELECT * FROM opentsdb_test") result.show() # close session sparkSession.stop()
- Connecting to OpenTSDB through DataFrame APIs
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table sparkSession.sql( "create table opentsdb_test using opentsdb options(\ 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',\ 'metric'='ct_opentsdb',\ 'tags'='city,location')") # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("aaa", "abc", 123456L, 30.0)]) # Setting schema schema = StructType([StructField("location", StringType()),\ StructField("name", StringType()),\ StructField("timestamp", LongType()),\ StructField("value", DoubleType())]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Set cross-source connection parameters metric = "ctopentsdb" tags = "city,location" Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242" # Write data to the cloudtable-opentsdb dataFrame.write.insertInto("opentsdb_test") # ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.******* # dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save() # Read data on CloudTable-OpenTSDB jdbdDF = sparkSession.read\ .format("opentsdb")\ .option("Host",Host)\ .option("metric",metric)\ .option("tags",tags)\ .load() jdbdDF.show() # close session sparkSession.stop()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot