Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with HBase (By Scala)> Complete Example Code

Complete Example Code

Updated at: Mar 17, 2020 GMT+08:00

Maven Dependency

1
2
3
4
5
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.3.2</version>
</dependency>

Connecting to Datasources Through SQL APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.spark.sql.SparkSession

object Test_SparkSql_HBase {
  def main(args: Array[String]): Unit = {
    // Create a SparkSession session.
    val sparkSession = SparkSession.builder().getOrCreate()

    /**
     * Create an association table for the DLI association Hbase table
     */
    sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 
        'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
	'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
	          cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
	          cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
	'TableName'='table_DupRowkey1',
	'RowKey'='id:5,location:6,city:7',
	'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,
		longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")

    //*****************************SQL model***********************************
    sparkSession.sql("insert into test_hbase values('12345','huawei','guiyang',false,null,3,23,2.3,2.34)")
    sparkSession.sql("select * from test_hbase").collect()

    sparkSession.close()
  }
}

Connecting to Datasources Through DataFrame APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import scala.collection.mutable

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._

object Test_SparkSql_HBase {
  def main(args: Array[String]): Unit = {
    // Create a SparkSession session.
    val sparkSession = SparkSession.builder().getOrCreate()

    // Create an association table for the DLI association Hbase table
    sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 
        'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
	'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
	          cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
	          cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
	'TableName'='table_DupRowkey1',
	'RowKey'='id:5,location:6,city:7',
	'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")

    //*****************************DataFrame model***********************************
    // Setting schema
    val attrId = new StructField("id",StringType)
    val location = new StructField("location",StringType)
    val city = new StructField("city",StringType)
    val booleanf = new StructField("booleanf",BooleanType)
    val shortf = new StructField("shortf",ShortType)
    val intf = new StructField("intf",IntegerType)
    val longf = new StructField("longf",LongType)
    val floatf = new StructField("floatf",FloatType)
    val doublef = new StructField("doublef",DoubleType)
    val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)

    // Populate data according to the type of schema
    val mutableRow: Seq[Any] = Seq("12345","huawei","guiyang",false,null,3,23,2.3,2.34)
    val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)

    // Import the constructed data into Hbase
    sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")

    // Read data on Hbase
    val map = new mutable.HashMap[String, String]()
    map("TableName") = "table_DupRowkey1"
    map("RowKey") = "id:5,location:6,city:7"
    map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
    map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                   cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                   cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
    sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().collect()

    sparkSession.close()
  }
}

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel