PySpark Example Code

Development description

The CloudTable HBase and MRS HBase can be connected to DLI as data sources.

  • Prerequisites

    A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.

  • Code implementation
    1. Dependencies related to import
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
      from pyspark.sql import SparkSession
      
    2. Create a session
      1
      sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
      
    3. Create a table to connect to HBase datasource
      1
      2
      3
      4
      5
      6
      7
      8
      9
      sparkSession.sql(
        "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,
           floatf FLOAT, doublef DOUBLE) using hbase OPTIONS (
         'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                     cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                     cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
         'TableName' = 'table_DupRowkey1',
         'RowKey' = 'id:5,location:6,city:7',
         'Cols' = 'booleanf:CF1.booleanf, shortf:CF1.shortf, intf:CF1.intf, longf:CF1.longf, floatf:CF1.floatf, doublef:CF1.doublef')")
      
      • For details about the ZKHost, RowKey, and Cols parameters, see Table 1.
      • TableName: Name of a table in the CloudTable file. If no table name exists, the system automatically creates a table name.
  • Connecting to Datasources Through DataFrame APIs
    1. Construct a schema
      1
      2
      3
      4
      5
      6
      7
      8
      9
      schema = StructType([StructField("id", StringType()),                     
                           StructField("location", StringType()),                     
                           StructField("city", StringType()),                     
                           StructField("booleanf", BooleanType()),                     
                           StructField("shortf", ShortType()),                     
                           StructField("intf", IntegerType()),                     
                           StructField("longf", LongType()),                     
                           StructField("floatf", FloatType()),                     
                           StructField("doublef", DoubleType())])
      
    2. Configure data
      1
      dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])
      
    3. Create a DataFrame
      1
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    4. Import data to HBase
      1
      dataFrame.write.insertInto("test_hbase")
      
    5. Read data from HBase
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      // Set cross-source connection parameters
      TableName = "table_DupRowkey1"
      RowKey = "id:5,location:6,city:7"
      Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
      ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                cloudtable-cf82-zk1- WY09px9l.cloudtable.com:2181"
      
      // select
      jdbcDF = sparkSession.read.schema(schema)\
                       .format("hbase")\
                       .option("ZKHost",ZKHost)\
                       .option("TableName",TableName)\
                       .option("RowKey",RowKey)\
                       .option("Cols",Cols)\
                       .load()
      jdbcDF.filter("id = '12333' or id='11111'").show()
      

      The length of id, location, and city parameter is limited. When inserting data, the data value must be specified based on the length. Otherwise, an encoding format error occurs.

    6. Operation result

  • Submitting a Spark job
    1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.hbase.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
from pyspark.sql import SparkSession

if __name__ == "__main__":
  # Create a SparkSession session.    
  sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
  
  # Createa data table for DLI-associated ct    
  sparkSession.sql(
    "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,
       floatf FLOAT,doublef DOUBLE) using hbase OPTIONS (
    'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
    'TableName' = 'table_DupRowkey1',
    'RowKey' = 'id:5,location:6,city:7',
    'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")

  # Create a DataFrame and initialize the DataFrame data.    
  dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])

  # Setting schema    
  schema = StructType([StructField("id", StringType()), 
                       StructField("location", StringType()), 
                       StructField("city", StringType()),                         
                       StructField("booleanf", BooleanType()),                        
                       StructField("shortf", ShortType()),                     
                       StructField("intf", IntegerType()),                 
                       StructField("longf", LongType()),                   
                       StructField("floatf", FloatType()),              
                       StructField("doublef", DoubleType())])

  # Create a DataFrame from RDD and schema    
  dataFrame = sparkSession.createDataFrame(dataList, schema)

  # Write data to the cloudtable-hbase    
  dataFrame.write.insertInto("test_hbase")

  # Set cross-source connection parameters    
  TableName = "table_DupRowkey1"
  RowKey = "id:5,location:6,city:7"
  Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
  ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
            cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
  # Read data on CloudTable-HBase    
  jdbcDF = sparkSession.read.schema(schema)\
                       .format("hbase")\
                       .option("ZKHost", ZKHost)\
                       .option("TableName",TableName)\
                       .option("RowKey", RowKey)\
                       .option("Cols", Cols)\
                       .load()    
  jdbcDF.filter("id = '12333' or id='11111'").show()

  # close session    
  sparkSession.stop()