更新时间:2024-07-04 GMT+08:00

java样例代码

开发说明

本样例只适用于MRS的HBase。

  • 前提条件

    在DLI管理控制台上已完成创建跨源连接并绑定队列。具体操作请参考《数据湖探索用户指南》。

    认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

  • 代码实现
    1. 导入依赖
      • 涉及到的mvn依赖库
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
      • import相关依赖包
        1
        import org.apache.spark.sql.SparkSession;
        
    2. 创建会话
      1
      parkSession = SparkSession.builder().appName("datasource-HBase-MRS").getOrCreate();
      
  • 通过SQL API 访问
    • 未开启Kerberos认证
      1. 创建DLI跨源访问MRS HBase的关联表,填写连接参数。
        1
        sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city') ");
        
      2. 插入数据
        1
        sparkSession.sql("insert into testhbase values('12345','abc','xxx')");
        
      3. 查询数据
        1
        sparkSession.sql("select * from testhbase").show();
        

        插入数据后:

    • 开启Kerberos认证
      1. 创建DLI跨源访问MRS HBase的关联表,填写连接参数。
        1
        sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city,'krb5conf'='./krb5.conf','keytab'='./user.keytab','principal'='krbtest') ");
        
        与未开启kerberos认证相比,开启了kerberos认证需要多设置三个参数,如表1所示。
        表1 参数说明

        参数名称与参数值

        参数说明

        'krb5conf' = './krb5.conf'

        krb5.conf的地址。

        'keytab'='./user.keytab'

        Keytab的地址。

        'principal' ='krbtest'

        认证用户名。

        krb5.conf和keytab文件获取请具体参考开启Kerberos认证时的相关配置文件操作说明。

      2. 插入数据
        1
        sparkSession.sql("insert into testhbase values('95274','abc','Hongkong')");
        
      3. 查询数据
        1
        sparkSession.sql("select * from testhbase").show();
        
  • 提交Spark作业
    1. 将写好的代码文件生成jar包,上传至DLI中。

      控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

    2. 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和user.keytab文件添加到作业的依赖文件中,未开启Kerberos认证该步骤忽略。如图1所示:
      图1 添加依赖文件
    3. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

      控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
      • 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.hbase。
      • 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*

      • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
      • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。

完整示例代码

  • 通过SQL API访问
    • 未开启Kerberos完整代码示例
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      import org.apache.spark.sql.SparkSession;
       
      public class java_mrs_hbase {
       
          public static void main(String[] args) {
              //create a SparkSession session
              SparkSession sparkSession = SparkSession.builder().appName("datasource-HBase-MRS").getOrCreate();
       
              sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city') ");
       
              //*****************************SQL model***********************************
              sparkSession.sql("insert into testhbase values('95274','abc','Hongkong')");
              sparkSession.sql("select * from testhbase").show();
       
              sparkSession.close();
          }
      }
      
    • 开启Kerberos完整代码示例
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      import org.apache.spark.SparkContext;
      import org.apache.spark.SparkFiles;
      import org.apache.spark.sql.SparkSession;
      import java.io.File;
      import java.io.FileInputStream;
      import java.io.FileOutputStream;
      import java.io.IOException;
      import java.io.InputStream;
      import java.io.OutputStream;
      
      public class Test_HBase_SparkSql_Kerberos {
      
          private static void copyFile(File src,File dst) throws IOException {
              InputStream input  = null;
              OutputStream output = null;
              try {
                  input = new FileInputStream(src);
                  output = new FileOutputStream(dst);
                  byte[] buf = new byte[1024];
                  int bytesRead;
                  while ((bytesRead = input.read(buf)) > 0) {
                      output.write(buf, 0, bytesRead);
                  }
              } finally { 
                  input.close();
                  output.close();
              }
          }
      
          public static void main(String[] args) throws InterruptedException, IOException {
              SparkSession sparkSession = SparkSession.builder().appName("Test_HBase_SparkSql_Kerberos").getOrCreate();
              SparkContext sc = sparkSession.sparkContext();
              sc.addFile("obs://xietest1/lzq/krb5.conf");
              sc.addFile("obs://xietest1/lzq/user.keytab");
              Thread.sleep(20);
      
              File krb5_startfile = new File(SparkFiles.get("krb5.conf"));
              File keytab_startfile = new File(SparkFiles.get("user.keytab"));
              String path_user = System.getProperty("user.dir");
              File keytab_endfile = new File(path_user + "/" + keytab_startfile.getName());
              File krb5_endfile = new File(path_user + "/" + krb5_startfile.getName());
              copyFile(krb5_startfile,krb5_endfile);
              copyFile(keytab_startfile,keytab_endfile);
              Thread.sleep(20);
      
              /**
               * Create an association table for the DLI association Hbase table
               */
              sparkSession.sql("CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +
                      "using hbase OPTIONS(" +
                      "'ZKHost'='10.0.0.146:2181'," +
                      "'TableName'='hbtest'," +
                      "'RowKey'='id:100'," +
                      "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," +
                      "'krb5conf'='" + path_user + "/krb5.conf'," +
                      "'keytab'='" + path_user+ "/user.keytab'," +
                      "'principal'='krbtest') ");
      
              //*****************************SQL model***********************************
              sparkSession.sql("insert into testhbase values('newtest',true,1,2,3,4,5)");
              sparkSession.sql("select * from testhbase").show();
              sparkSession.close();
          }
      }