scala样例代码
开发说明
支持对接CloudTable的HBase和MRS的HBase。
- 前提条件
在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。
认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
- 构造依赖信息,创建SparkSession
- 导入依赖
涉及到的mvn依赖库
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
import相关依赖包1 2 3 4
import scala.collection.mutable import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._
- 创建会话。
1
val sparkSession = SparkSession.builder().getOrCreate()
- 创建DLI跨源访问 HBase的关联表。
- 如果对接的HBase集群未开启Kerberos认证,则样例代码参考如下。
1 2 3 4 5 6 7 8 9
sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS ( 'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1', 'RowKey'='id:5,location:6,city:7', 'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')" )
- 如果对接的HBase集群开启了Kerberos认证,则样例代码参考如下。
1 2 3 4 5 6 7 8 9 10 11
sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS ( 'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1', 'RowKey'='id:5,location:6,city:7', 'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef', 'krb5conf'='./krb5.conf', 'keytab' = './user.keytab', 'principal' = 'krbtest')")
表1 创建表参数 参数
说明
ZKHost
HBase集群的ZK连接地址。
获取ZK连接地址需要先创建跨源连接。具体操作请参考《数据湖探索用户指南》。
- 访问CloudTable集群,填写ZK连接地址(内网)。
- 访问MRS集群,填写ZK所在节点IP与ZK对外端口,格式为:"ZK_IP1:ZK_PORT1,ZK_IP2:ZK_PORT2"。
RowKey
指定作为rowkey的dli关联表字段,支持单rowkey与组合rowkey。单rowkey支持数值与String类型,不需要指定长度。组合rowkey仅支持String类型定长数据,格式为:属性名1:长度,属性名2:长度。
Cols
定义dli表字段和ct表字段之间的对应关系;其中,“:”前放dli表字段,冒号后放ct表信息,用“.”分隔ct表的列族和列名。
例如:“dli表字段1:ct表.ct表字段1, dli表字段2:ct表.ct表字段2, dli表字段3:ct表.ct表字段3”。
krb5conf
开启Kerberos认证后的krb5.conf文件路径,格式为'./krb5.conf'。具体详情参考开启Kerberos认证时的相关配置文件。
keytab
开启Kerberos认证后的keytab文件路径,格式为'./user.keytab'。具体详情参考开启Kerberos认证时的相关配置文件。
principal
开启Kerberos认证后创建的用户名。
- 如果对接的HBase集群未开启Kerberos认证,则样例代码参考如下。
- 导入依赖
通过SQL API访问数据源
- 插入数据
1
sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)")
- 查询数据
1
sparkSession.sql("select * from test_hbase").show ()
返回结果:
通过DataFrame API访问数据源
- 构造schema
1 2 3 4 5 6 7 8 9 10
val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType) val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType) val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)
- 根据schema的类型构造数据
1 2
val mutableRow: Seq[Any] = Seq("12345","abc","city1",false,null,3,23,2.3,2.34) val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
- 导入数据到HBase
1
sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")
- 读取HBase上的数据
1 2 3 4 5 6 7 8
val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1" map("RowKey") = "id:5,location:6,city:7" map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show()
返回结果:
提交Spark作业
- 将写好的代码生成jar包,上传至DLI中。
- 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和user.keytab文件添加到作业的其他依赖文件中,未开启Kerberos认证该步骤忽略。如图1所示:
- 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。
控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
- 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.hbase。
- 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
- 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
- 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
完整示例代码
- Maven依赖
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- 通过SQL API访问
- 未开启Kerberos认证样例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import org.apache.spark.sql.SparkSession object Test_SparkSql_HBase { def main(args: Array[String]): Unit = { // Create a SparkSession session. val sparkSession = SparkSession.builder().getOrCreate() /** * Create an association table for the DLI association Hbase table */ sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS ( 'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1', 'RowKey'='id:5,location:6,city:7', 'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf, longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')") //*****************************SQL model*********************************** sparkSession.sql("insert into test_hbase values('12345','abc','city1',false,null,3,23,2.3,2.34)") sparkSession.sql("select * from test_hbase").collect() sparkSession.close() } }
- 开启Kerberos认证样例代码
import org.apache.spark.SparkFiles import org.apache.spark.sql.SparkSession import java.io.{File, FileInputStream, FileOutputStream} object Test_SparkSql_HBase_Kerberos { def copyFile2(Input:String)(OutPut:String): Unit ={ val fis = new FileInputStream(Input) val fos = new FileOutputStream(OutPut) val buf = new Array[Byte](1024) var len = 0 while ({len = fis.read(buf);len} != -1){ fos.write(buf,0,len) } fos.close() fis.close() } def main(args: Array[String]): Unit = { // Create a SparkSession session. val sparkSession = SparkSession.builder().getOrCreate() val sc = sparkSession.sparkContext sc.addFile("krb5.conf的obs地址") sc.addFile("user.keytab的obs地址") Thread.sleep(10) val krb5_startfile = new File(SparkFiles.get("krb5.conf")) val keytab_startfile = new File(SparkFiles.get("user.keytab")) val path_user = System.getProperty("user.dir") val keytab_endfile = new File(path_user + "/" + keytab_startfile.getName) val krb5_endfile = new File(path_user + "/" + krb5_startfile.getName) println(keytab_endfile) println(krb5_endfile) var krbinput = SparkFiles.get("krb5.conf") var krboutput = path_user+"/krb5.conf" copyFile2(krbinput)(krboutput) var keytabinput = SparkFiles.get("user.keytab") var keytaboutput = path_user+"/user.keytab" copyFile2(keytabinput)(keytaboutput) Thread.sleep(10) /** * Create an association table for the DLI association Hbase table */ sparkSession.sql("CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " + "using hbase OPTIONS(" + "'ZKHost'='10.0.0.146:2181'," + "'TableName'='hbtest'," + "'RowKey'='id:100'," + "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," + "'krb5conf'='" + path_user + "/krb5.conf'," + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ") //*****************************SQL model*********************************** sparkSession.sql("insert into testhbase values('newtest',true,1,2,3,4,5)") val result = sparkSession.sql("select * from testhbase") result.show() sparkSession.close() } }
- 未开启Kerberos认证样例代码
- 通过DataFrame API访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
import scala.collection.mutable import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ object Test_SparkSql_HBase { def main(args: Array[String]): Unit = { // Create a SparkSession session. val sparkSession = SparkSession.builder().getOrCreate() // Create an association table for the DLI association Hbase table sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN, 'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS ( 'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1', 'RowKey'='id:5,location:6,city:7', 'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')") //*****************************DataFrame model*********************************** // Setting schema val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType) val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType) val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef) // Populate data according to the type of schema val mutableRow: Seq[Any] = Seq("12345","abc","city1",false,null,3,23,2.3,2.34) val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1) // Import the constructed data into Hbase sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase") // Read data on Hbase val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1" map("RowKey") = "id:5,location:6,city:7" map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().collect() sparkSession.close() } }