Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with HBase (By PySpark)> Complete Example Code

Complete Example Code

Updated at: Mar 17, 2020 GMT+08:00
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
from pyspark.sql import SparkSession

if __name__ == "__main__":
  # Create a SparkSession session.    
  sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
  
  # Createa data table for DLI-associated ct    
  sparkSession.sql(
    "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,
       floatf FLOAT,doublef DOUBLE) using hbase OPTIONS (
    'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
    'TableName' = 'table_DupRowkey1',
    'RowKey' = 'id:5,location:6,city:7',
    'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")

  # Create a DataFrame and initialize the DataFrame data.    
  dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])

  # Setting schema    
  schema = StructType([StructField("id", StringType()), 
                       StructField("location", StringType()), 
                       StructField("city", StringType()),                         
                       StructField("booleanf", BooleanType()),                        
                       StructField("shortf", ShortType()),                     
                       StructField("intf", IntegerType()),                 
                       StructField("longf", LongType()),                   
                       StructField("floatf", FloatType()),              
                       StructField("doublef", DoubleType())])

  # Create a DataFrame from RDD and schema    
  dataFrame = sparkSession.createDataFrame(dataList, schema)

  # Write data to the cloudtable-hbase    
  dataFrame.write.insertInto("test_hbase")

  # Set cross-source connection parameters    
  TableName = "table_DupRowkey1"
  RowKey = "id:5,location:6,city:7"
  Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
  ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
            cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
  # Read data on CloudTable-HBase    
  jdbcDF = sparkSession.read.schema(schema)\
                       .format("hbase")\
                       .option("ZKHost", ZKHost)\
                       .option("TableName",TableName)\
                       .option("RowKey", RowKey)\
                       .option("Cols", Cols)\
                       .load()    
  jdbcDF.filter("id = '12333' or id='11111'").show()

  # close session    
  sparkSession.stop()

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel