更新时间:2024-09-19 GMT+08:00

java样例代码

前提条件

在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。

CSS非安全集群

  • 开发说明
    • 代码实现
      • 构造依赖信息,创建SparkSession
        1. 导入依赖

          涉及到的mvn依赖库

          <dependency>
                      <groupId>org.apache.spark</groupId>
                      <artifactId>spark-sql_2.11</artifactId>
                      <version>2.3.2</version>
          </dependency>
          import相关依赖包
          1
          import org.apache.spark.sql.SparkSession;
          
        2. 创建会话
          1
          SparkSession sparkSession = SparkSession.builder().appName("datasource-css").getOrCreate();
          
    • 通过SQL API访问
      1. 创建DLI跨源访问 CSS关联表。
        sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.9.213:9200', 'es.nodes.wan.only' = 'true','resource' ='/mytest')");
      2. 插入数据。
        sparkSession.sql("insert into css_table values(18, 'John'),(28, 'Bob')");
      3. 查询数据。
        sparkSession.sql("select * from css_table").show();
      4. 删除数据表。
        sparkSession.sql("drop table css_table");
    • 提交Spark作业
      1. 将写好的代码文件生成jar包,上传至DLI中。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 如果选择Spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 “Spark参数(--conf)” 配置

          spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*

          spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*

        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
  • 完整示例代码
    • Maven依赖
      <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.11</artifactId>
                  <version>2.3.2</version>
      </dependency>
    • 通过SQL API 访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      import org.apache.spark.sql.*;
       
      public class java_css_unsecurity {
       
          public static void main(String[] args) {
              SparkSession sparkSession = SparkSession.builder().appName("datasource-css-unsecurity").getOrCreate();
       
              // Create a DLI data table for DLI-associated CSS
              sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.15.34:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest')");
       
              //*****************************SQL model***********************************
              // Insert data into the DLI data table
              sparkSession.sql("insert into css_table values(18, 'John'),(28, 'Bob')");
       
              // Read data from DLI data table
              sparkSession.sql("select * from css_table").show();
       
              // drop table
              sparkSession.sql("drop table css_table");
       
              sparkSession.close();
          }
      }
      

CSS安全集群

  • 准备工作

    请参考CSS安全集群配置,准备工作的主要目的是为了生成keystore.jks文件和truststore.jks文件,并将其上传至OBS桶中。

  • 开发说明-https off
    如果没有开启https访问的话,不需要去生成keystore.jks和truststore.jks文件的,只需要设置好ssl访问和账号密码参数即可。
    • 构造依赖信息,创建SparkSession
      1. 导入依赖。

        涉及到的mvn依赖库:

        <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.3.2</version>
        </dependency>
        import相关依赖包:
        1
        import org.apache.spark.sql.SparkSession;
        
      2. 创建会话。
        1
        SparkSession sparkSession = SparkSession.builder().appName("datasource-css").getOrCreate();
        
    • 通过SQL API 访问
      1. 创建DLI跨源访问 CSS的关联表。
        1
        sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.9.213:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest','es.net.ssl'='false','es.net.http.auth.user'='admin','es.net.http.auth.pass'='*******')");
        
        • 创建CSS跨源表的参数详情可参考表1
        • 上述示例中,因为CSS安全集群关闭了https访问,所以“es.net.ssl”参数要设置为“false”“es.net.http.auth.user”以及“es.net.http.auth.pass”为创建集群时设置的账号和密码。
      2. 插入数据
        1
        sparkSession.sql("insert into css_table values(18, 'John'),(28, 'Bob')");
        
      3. 查询数据
        1
        sparkSession.sql("select * from css_table").show();
        
      4. 删除数据表
        sparkSession.sql("drop table css_table");
    • 提交Spark作业
      1. 将写好的代码文件生成jar包,上传至DLI中。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明。
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
    • 完整示例代码
      • Maven依赖
        <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.3.2</version>
        </dependency>
  • 开发说明-https on
    • 构造依赖信息,创建SparkSession
      1. 导入依赖。

        涉及到的mvn依赖库:

        <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.3.2</version>
        </dependency>

        import相关依赖包:

        1
        2
        3
        4
        5
        import org.apache.spark.SparkFiles;
        import org.apache.spark.sql.SparkSession;
        import java.io.IOException;
        import java.nio.file.Files;
        import java.nio.file.Paths;
        
      2. 创建会话。
        1
        SparkSession sparkSession = SparkSession.builder().appName("datasource-css").getOrCreate();
        
      3. 拷贝证书。
              sparkSession.sparkContext().addFile("obs://桶名/地址/transport-keystore.jks");
              sparkSession.sparkContext().addFile("obs://桶名/地址/truststore.jks");
        
                // 获取当前工作目录的路径
                String pathUser = System.getProperty("user.dir");
                System.out.println("path_user is " + pathUser);
        
                // 获取文件名
                String esTransportKeystoreFileName = SparkFiles.get("transport-keystore.jks");
                String esTruststoreFileName = SparkFiles.get("truststore.jks");
        
                System.out.println("esTransportKeystoreFileName is " + esTransportKeystoreFileName);
                System.out.println("esTruststoreFileName is " + esTruststoreFileName);
                // 拼接文件完整路径
                String esTransportKeystoreLocalPath = pathUser + "/" + "transport-keystore.jks";
                String esTruststoreLocalPath = pathUser + "/" + "truststore.jks";
        
                System.out.println("esTransportKeystoreLocalPath is " + esTransportKeystoreLocalPath);
                System.out.println("esTruststoreLocalPath is " + esTruststoreLocalPath);
                try {
                    // 拷贝 keystore 文件
                    copyFile(esTransportKeystoreFileName, esTransportKeystoreLocalPath);
                    // 拷贝 truststore 文件
                    copyFile(esTruststoreFileName, esTruststoreLocalPath);
                    // 等待一段时间
                    Thread.sleep(2000);
        
                    System.out.println("Files copied successfully:");
                    System.out.println("es_transport-keystore.jks: " + esTransportKeystoreLocalPath);
                    System.out.println("es_truststore.jks: " + esTruststoreLocalPath);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
        
    • 通过SQL API 访问
      1. 创建DLI跨源访问 CSS的关联表。
        1
        2
        3
        sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.13.189:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest','es.net.ssl'='true','es.net.ssl.keystore.location' = 'file://" + esTransportKeystoreLocalPath + "','es.net.ssl.keystore.pass' = '**', 
        'es.net.ssl.truststore.location'='file://" + esTruststoreLocalPath + "', 
        'es.net.ssl.truststore.pass'='***','es.net.http.auth.user'='admin','es.net.http.auth.pass'='**')");
        

        创建CSS跨源表的参数详情可参考表1

      2. 插入数据
        1
        sparkSession.sql("insert into css_table values(18, 'John'),(28, 'Bob')");
        
      3. 查询数据
        1
        sparkSession.sql("select * from css_table").show();
        
      4. 删除数据表
        sparkSession.sql("drop table css_table");
    • 提交Spark作业
      1. 将写好的代码文件生成jar包,上传至DLI中。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参数说明”。
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
    • 完整示例代码
      • Maven依赖
        <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.3.2</version>
        </dependency>
      • 通过SQL API 访问
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        import org.apache.spark.SparkFiles;
        import org.apache.spark.sql.SparkSession;
        import java.io.IOException;
        import java.nio.file.Files;
        import java.nio.file.Paths;
        
        public class java_css_security_httpson { 
            public static void main(String[] args) { 
                SparkSession sparkSession = SparkSession.builder().appName("datasource-css").getOrCreate(); 
        
                sparkSession.sparkContext().addFile("obs://桶名/地址/transport-keystore.jks");
                sparkSession.sparkContext().addFile("obs://桶名/地址/css/truststore.jks");
        
                // 获取当前工作目录的路径
                String pathUser = System.getProperty("user.dir");
                System.out.println("path_user is " + pathUser);
        
                // 获取文件名
                String esTransportKeystoreFileName = SparkFiles.get("transport-keystore.jks");
                String esTruststoreFileName = SparkFiles.get("truststore.jks");
        
                System.out.println("esTransportKeystoreFileName is " + esTransportKeystoreFileName);
                System.out.println("esTruststoreFileName is " + esTruststoreFileName);
                // 拼接文件完整路径
                String esTransportKeystoreLocalPath = pathUser + "/" + "transport-keystore.jks";
                String esTruststoreLocalPath = pathUser + "/" + "truststore.jks";
        
                System.out.println("esTransportKeystoreLocalPath is " + esTransportKeystoreLocalPath);
                System.out.println("esTruststoreLocalPath is " + esTruststoreLocalPath);
                try {
                    // 拷贝 keystore 文件
                    copyFile(esTransportKeystoreFileName, esTransportKeystoreLocalPath);
                    // 拷贝 truststore 文件
                    copyFile(esTruststoreFileName, esTruststoreLocalPath);
                    // 等待一段时间
                    Thread.sleep(2000);
        
                    System.out.println("Files copied successfully:");
                    System.out.println("es_transport-keystore.jks: " + esTransportKeystoreLocalPath);
                    System.out.println("es_truststore.jks: " + esTruststoreLocalPath);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
        
                // Create a DLI data table for DLI-associated CSS 
                sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.13.189:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest','es.net.ssl'='true','es.net.ssl.keystore.location' = 'file://" + esTransportKeystoreLocalPath + "','es.net.ssl.keystore.pass' = '**','es.net.ssl.truststore.location'='file://" + esTruststoreLocalPath + "','es.net.ssl.truststore.pass'='**','es.net.http.auth.user'='admin','es.net.http.auth.pass'='**')");
        
                //*****************************SQL model*********************************** 
                // Insert data into the DLI data table 
                sparkSession.sql("insert into css_table values(34, 'Yuan'),(28, 'Kids')"); 
        
                // Read data from DLI data table 
                sparkSession.sql("select * from css_table").show(); 
        
                // drop table 
                sparkSession.sql("drop table css_table"); 
        
                sparkSession.close(); 
            } 
            private static void copyFile(String sourcePath, String destinationPath) throws IOException {
                 // 从远程存储复制文件到本地的操作
                 byte[] fileContent = Files.readAllBytes(Paths.get(sourcePath));
                 Files.write(Paths.get(destinationPath), fileContent);
            }
        }