PySpark Example Code
Development description
The CloudTable HBase and MRS HBase can be connected to DLI as data sources.
- Prerequisites
A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.
- Code implementation
- Dependencies related to import
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession
- Create a session
1
sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
- Create a table to connect to HBase datasource
1 2 3 4 5 6 7 8 9
sparkSession.sql( "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG, floatf FLOAT, doublef DOUBLE) using hbase OPTIONS ( 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName' = 'table_DupRowkey1', 'RowKey' = 'id:5,location:6,city:7', 'Cols' = 'booleanf:CF1.booleanf, shortf:CF1.shortf, intf:CF1.intf, longf:CF1.longf, floatf:CF1.floatf, doublef:CF1.doublef')")
- For details about the ZKHost, RowKey, and Cols parameters, see Table 1.
- TableName: Name of a table in the CloudTable file. If no table name exists, the system automatically creates a table name.
- Dependencies related to import
- Connecting to Datasources Through DataFrame APIs
- Construct a schema
1 2 3 4 5 6 7 8 9
schema = StructType([StructField("id", StringType()), StructField("location", StringType()), StructField("city", StringType()), StructField("booleanf", BooleanType()), StructField("shortf", ShortType()), StructField("intf", IntegerType()), StructField("longf", LongType()), StructField("floatf", FloatType()), StructField("doublef", DoubleType())])
- Configure data
1
dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])
- Create a DataFrame
1
dataFrame = sparkSession.createDataFrame(dataList, schema)
- Import data to HBase
1
dataFrame.write.insertInto("test_hbase")
- Read data from HBase
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Set cross-source connection parameters TableName = "table_DupRowkey1" RowKey = "id:5,location:6,city:7" Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1- WY09px9l.cloudtable.com:2181" // select jdbcDF = sparkSession.read.schema(schema)\ .format("hbase")\ .option("ZKHost",ZKHost)\ .option("TableName",TableName)\ .option("RowKey",RowKey)\ .option("Cols",Cols)\ .load() jdbcDF.filter("id = '12333' or id='11111'").show()
The length of id, location, and city parameter is limited. When inserting data, the data value must be specified based on the length. Otherwise, an encoding format error occurs.
- Operation result
- Construct a schema
- Submitting a Spark job
- Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
- In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
- When submitting a job, you need to specify a dependency module named sys.datasource.hbase.
- For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
- For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.
Complete example code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | # _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
# Createa data table for DLI-associated ct
sparkSession.sql(
"CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,
floatf FLOAT,doublef DOUBLE) using hbase OPTIONS (
'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
'TableName' = 'table_DupRowkey1',
'RowKey' = 'id:5,location:6,city:7',
'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
# Create a DataFrame and initialize the DataFrame data.
dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])
# Setting schema
schema = StructType([StructField("id", StringType()),
StructField("location", StringType()),
StructField("city", StringType()),
StructField("booleanf", BooleanType()),
StructField("shortf", ShortType()),
StructField("intf", IntegerType()),
StructField("longf", LongType()),
StructField("floatf", FloatType()),
StructField("doublef", DoubleType())])
# Create a DataFrame from RDD and schema
dataFrame = sparkSession.createDataFrame(dataList, schema)
# Write data to the cloudtable-hbase
dataFrame.write.insertInto("test_hbase")
# Set cross-source connection parameters
TableName = "table_DupRowkey1"
RowKey = "id:5,location:6,city:7"
Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
# Read data on CloudTable-HBase
jdbcDF = sparkSession.read.schema(schema)\
.format("hbase")\
.option("ZKHost", ZKHost)\
.option("TableName",TableName)\
.option("RowKey", RowKey)\
.option("Cols", Cols)\
.load()
jdbcDF.filter("id = '12333' or id='11111'").show()
# close session
sparkSession.stop()
|
Last Article: Scala Example Code
Next Article: Connecting to OpenTSDB

Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.