PySpark Example Code
Development description
The CloudTable OpenTSDB and MRS OpenTSDB can be connected to DLI as data sources.
- Prerequisites
A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.
- Code implementation
- Dependencies related to import
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession
- Create a session
1
sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
- Create a table to connect to OpenTSDB datasource
1 2 3 4
sparkSession.sql("create table opentsdb_test using opentsdb options( 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ct_opentsdb', 'tags'='city,location')")
- Dependencies related to import
- Connecting to datasources through DataFrame APIs
- Construct a schema
1 2 3 4
schema = StructType([StructField("location", StringType()), StructField("name", StringType()), StructField("timestamp", LongType()), StructField("value", DoubleType())])
- Configure data
1
dataList = sparkSession.sparkContext.parallelize([("beijing", "abc", 123456L, 30.0)])
- Create a DataFrame
1
dataFrame = sparkSession.createDataFrame(dataList, schema)
- Import data to the OpenTSDB
1
dataFrame.write.insertInto("opentsdb_test")
- Read data from the OpenTSDB
1 2 3 4 5 6 7
jdbdDF = sparkSession.read .format("opentsdb")\ .option("Host","opentsdb-3xcl8dir15m58z3.cloudtable.com:4242")\ .option("metric","ctopentsdb")\ .option("tags","city,location")\ .load() jdbdDF.show()
- Operation result
- Construct a schema
- Submitting a Spark Job
- Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
- In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
- When submitting a job, you need to specify a dependency module named sys.datasource.opentsdb.
- For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
- For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.
Complete example code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | # _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
# Create a DLI cross-source association opentsdb data table
sparkSession.sql("create table opentsdb_test using opentsdb options(
'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
'metric'='ct_opentsdb',
'tags'='city,location')")
# Create a DataFrame and initialize the DataFrame data.
dataList = sparkSession.sparkContext.parallelize([("beijing", "abc", 123456L, 30.0)])
# Setting schema
schema = StructType([StructField("location", StringType()),
StructField("name", StringType()),
StructField("timestamp", LongType()),
StructField("value", DoubleType())])
# Create a DataFrame from RDD and schema
dataFrame = sparkSession.createDataFrame(dataList, schema)
# Set cross-source connection parameters
metric = "ctopentsdb"
tags = "city,location"
Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
# Write data to the cloudtable-opentsdb
dataFrame.write.insertInto("opentsdb_test")
# ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.*******
# dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save()
# Read data on CloudTable-OpenTSDB
jdbdDF = sparkSession.read\
.format("opentsdb")\
.option("Host",Host)\
.option("metric",metric)\
.option("tags",tags)\
.load()
jdbdDF.show()
# close session
sparkSession.stop()
|
Last Article: Scala Example Code
Next Article: Connecting to RDS

Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.