更新时间:2024-07-04 GMT+08:00
java样例代码
开发说明
本样例只适用于MRS的HBase。
- 前提条件
在DLI管理控制台上已完成创建跨源连接并绑定队列。具体操作请参考《数据湖探索用户指南》。
认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
- 代码实现
- 导入依赖
- 涉及到的mvn依赖库
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- import相关依赖包
1
import org.apache.spark.sql.SparkSession;
- 涉及到的mvn依赖库
- 创建会话
1
parkSession = SparkSession.builder().appName("datasource-HBase-MRS").getOrCreate();
- 导入依赖
- 通过SQL API 访问
- 未开启Kerberos认证
- 创建DLI跨源访问MRS HBase的关联表,填写连接参数。
1
sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city') ");
- 插入数据
1
sparkSession.sql("insert into testhbase values('12345','abc','xxx')");
- 查询数据
1
sparkSession.sql("select * from testhbase").show();
插入数据后:
- 创建DLI跨源访问MRS HBase的关联表,填写连接参数。
- 开启Kerberos认证
- 创建DLI跨源访问MRS HBase的关联表,填写连接参数。
1
sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city,'krb5conf'='./krb5.conf','keytab'='./user.keytab','principal'='krbtest') ");
与未开启kerberos认证相比,开启了kerberos认证需要多设置三个参数,如表1所示。krb5.conf和keytab文件获取请具体参考开启Kerberos认证时的相关配置文件操作说明。
- 插入数据
1
sparkSession.sql("insert into testhbase values('95274','abc','Hongkong')");
- 查询数据
1
sparkSession.sql("select * from testhbase").show();
- 创建DLI跨源访问MRS HBase的关联表,填写连接参数。
- 未开启Kerberos认证
- 提交Spark作业
- 将写好的代码文件生成jar包,上传至DLI中。
- 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和user.keytab文件添加到作业的依赖文件中,未开启Kerberos认证该步骤忽略。如图1所示:
- 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。
控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
- 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.hbase。
- 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
- 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
- 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
完整示例代码
- 通过SQL API访问
- 未开启Kerberos完整代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import org.apache.spark.sql.SparkSession; public class java_mrs_hbase { public static void main(String[] args) { //create a SparkSession session SparkSession sparkSession = SparkSession.builder().appName("datasource-HBase-MRS").getOrCreate(); sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city') "); //*****************************SQL model*********************************** sparkSession.sql("insert into testhbase values('95274','abc','Hongkong')"); sparkSession.sql("select * from testhbase").show(); sparkSession.close(); } }
- 开启Kerberos完整代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
import org.apache.spark.SparkContext; import org.apache.spark.SparkFiles; import org.apache.spark.sql.SparkSession; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class Test_HBase_SparkSql_Kerberos { private static void copyFile(File src,File dst) throws IOException { InputStream input = null; OutputStream output = null; try { input = new FileInputStream(src); output = new FileOutputStream(dst); byte[] buf = new byte[1024]; int bytesRead; while ((bytesRead = input.read(buf)) > 0) { output.write(buf, 0, bytesRead); } } finally { input.close(); output.close(); } } public static void main(String[] args) throws InterruptedException, IOException { SparkSession sparkSession = SparkSession.builder().appName("Test_HBase_SparkSql_Kerberos").getOrCreate(); SparkContext sc = sparkSession.sparkContext(); sc.addFile("obs://xietest1/lzq/krb5.conf"); sc.addFile("obs://xietest1/lzq/user.keytab"); Thread.sleep(20); File krb5_startfile = new File(SparkFiles.get("krb5.conf")); File keytab_startfile = new File(SparkFiles.get("user.keytab")); String path_user = System.getProperty("user.dir"); File keytab_endfile = new File(path_user + "/" + keytab_startfile.getName()); File krb5_endfile = new File(path_user + "/" + krb5_startfile.getName()); copyFile(krb5_startfile,krb5_endfile); copyFile(keytab_startfile,keytab_endfile); Thread.sleep(20); /** * Create an association table for the DLI association Hbase table */ sparkSession.sql("CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " + "using hbase OPTIONS(" + "'ZKHost'='10.0.0.146:2181'," + "'TableName'='hbtest'," + "'RowKey'='id:100'," + "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," + "'krb5conf'='" + path_user + "/krb5.conf'," + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') "); //*****************************SQL model*********************************** sparkSession.sql("insert into testhbase values('newtest',true,1,2,3,4,5)"); sparkSession.sql("select * from testhbase").show(); sparkSession.close(); } }
- 未开启Kerberos完整代码示例
父主题: 对接HBase