文档首页 > > 开发指南> 使用Spark作业跨源访问数据源> 对接HBase>

scala样例代码

scala样例代码

分享
更新时间:2020/12/21 GMT+08:00

开发说明

支持对接CloudTable的HBase和MRS的HBase。

  • 前提条件

    在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。

  • 构造依赖信息,创建SparkSession
    1. 导入依赖
      涉及到的mvn依赖库
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
      import相关依赖包
      1
      2
      3
      4
      import scala.collection.mutable
      import org.apache.spark.sql.{Row, SparkSession}
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.types._
      
    2. 创建会话。
      1
      val sparkSession = SparkSession.builder().getOrCreate()
      
    3. 创建DLI跨源访问 HBase的关联表。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 
              'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
      	'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                        cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                        cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
      	'TableName'='table_DupRowkey1',
      	'RowKey'='id:5,location:6,city:7',        'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')",
              'krb5conf'='$krb5conf',
              'keytab' = '$keytab',
              'principal' = '$principal'
      )
      
      表1 创建表参数

      参数

      说明

      ZKHost

      HBase集群的ZK连接地址。

      获取ZK连接地址需要先创建跨源连接。具体操作请参考《数据湖探索用户指南》。

      • 访问CloudTable集群,填写ZK连接地址(内网)。
      • 访问MRS集群,填写ZK所在节点IP与ZK对外端口,格式为:"ZK_IP1:ZK_PORT1,ZK_IP2:ZK_PORT2"。
      说明:

      访问MRS集群,只支持创建增强型跨源连接并且需要配置主机信息,管理控制台操作请参考《数据湖探索用户指南》中的“增强型跨源连接”,相关API信息请参考创建增强型跨源连接

      RowKey

      指定作为rowkey的dli关联表字段,支持单rowkey与组合rowkey。单rowkey支持数值与String类型,不需要指定长度。组合rowkey仅支持String类型定长数据,格式为:属性名1:长度,属性名2:长度。

      Cols

      定义dli表字段和ct表字段之间的对应关系;其中,“:”前放dli表字段,冒号后放ct表信息,用“.”分隔ct表的列族和列名。

      例如:“dli表字段1:ct表.ct表字段1, dli表字段2:ct表.ct表字段2, dli表字段3:ct表.ct表字段3”。

      krb5conf

      krb5.conf文件obs路径,格式:“obs://ak:sk@bucket/path”

      keytab

      keytab文件obs路径,格式:obs://ak:sk@bucket/path

      principal

      krb机机认证用户principal。

  • 通过SQL API访问
    1. 插入数据
      1
      sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)")
      
    2. 查询数据
      1
      sparkSession.sql("select * from test_hbase").show ()
      

      返回结果:

  • 通过DataFrame API访问
    1. 构造schema
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      val attrId = new StructField("id",StringType)
      val location = new StructField("location",StringType)
      val city = new StructField("city",StringType)
      val booleanf = new StructField("booleanf",BooleanType)
      val shortf = new StructField("shortf",ShortType)
      val intf = new StructField("intf",IntegerType)
      val longf = new StructField("longf",LongType)
      val floatf = new StructField("floatf",FloatType)
      val doublef = new StructField("doublef",DoubleType)
      val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)
      
    2. 根据schema的类型构造数据
      1
      2
      val mutableRow: Seq[Any] = Seq("12345","abc","guiyang",false,null,3,23,2.3,2.34)
      val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
      
    3. 导入数据到HBase
      1
      sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")
      
    4. 读取HBase上的数据
      1
      2
      3
      4
      5
      6
      7
      8
      val map = new mutable.HashMap[String, String]()
      map("TableName") = "table_DupRowkey1"
      map("RowKey") = "id:5,location:6,city:7"
      map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
      map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                     cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                     cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
      sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show()
      

      返回结果:

  • 提交Spark作业
    1. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
    2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
      • 提交作业时,需要指定Module模块,名称为:sys.datasource.hbase。
      • 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参数说明”。
      • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。

完整示例代码

  • Maven依赖
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
  • 通过SQL API访问
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import org.apache.spark.sql.SparkSession
    
    object Test_SparkSql_HBase {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        /**
         * Create an association table for the DLI association Hbase table
         */
        sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 
            'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
    	'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
    	          cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
    	          cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
    	'TableName'='table_DupRowkey1',
    	'RowKey'='id:5,location:6,city:7',
    	'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,
    		longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
    
        //*****************************SQL model***********************************
        sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)")
        sparkSession.sql("select * from test_hbase").collect()
    
        sparkSession.close()
      }
    }
    
  • 通过DataFrame API访问
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    import scala.collection.mutable
    
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types._
    
    object Test_SparkSql_HBase {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        // Create an association table for the DLI association Hbase table
        sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 
            'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
    	'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
    	          cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
    	          cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',
    	'TableName'='table_DupRowkey1',
    	'RowKey'='id:5,location:6,city:7',
    	'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
    
        //*****************************DataFrame model***********************************
        // Setting schema
        val attrId = new StructField("id",StringType)
        val location = new StructField("location",StringType)
        val city = new StructField("city",StringType)
        val booleanf = new StructField("booleanf",BooleanType)
        val shortf = new StructField("shortf",ShortType)
        val intf = new StructField("intf",IntegerType)
        val longf = new StructField("longf",LongType)
        val floatf = new StructField("floatf",FloatType)
        val doublef = new StructField("doublef",DoubleType)
        val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)
    
        // Populate data according to the type of schema
        val mutableRow: Seq[Any] = Seq("12345","abc","guiyang",false,null,3,23,2.3,2.34)
        val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
    
        // Import the constructed data into Hbase
        sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")
    
        // Read data on Hbase
        val map = new mutable.HashMap[String, String]()
        map("TableName") = "table_DupRowkey1"
        map("RowKey") = "id:5,location:6,city:7"
        map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
        map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                       cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                       cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
        sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().collect()
    
        sparkSession.close()
      }
    }
    
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!非常感谢您的反馈,我们会继续努力做到更好!
反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区论坛频道来与我们联系探讨

智能客服提问云社区提问