更新时间:2024-07-04 GMT+08:00

scala样例代码

前提条件

在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。

CSS非安全集群

  • 开发说明
    • 构造依赖信息,创建SparkSession
      1. 导入依赖
        涉及到的mvn依赖库
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
        import相关依赖包
        1
        2
        import org.apache.spark.sql.{Row, SaveMode, SparkSession}
        import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
        
      2. 创建会话。
        1
        val sparkSession = SparkSession.builder().getOrCreate()
        
    • 通过SQL API访问
      1. 创建DLI跨源访问 CSS的关联表。
        1
        2
        3
        4
        sparkSession.sql("create table css_table(id int, name string) using css options(
        	'es.nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200',
        	'es.nodes.wan.only'='true',
        	'resource' '/mytest/css')")
        
        表1 创建表参数

        参数

        说明

        es.nodes

        CSS的连接地址,需要先创建跨源连接。具体操作请参考《数据湖探索用户指南》。

        创建经典型跨源连接后,使用经典型跨源连接中返回的连接地址。

        创建增强型跨源连接后,使用CSS提供的"内网访问地址",格式为"IP1:PORT1,IP2:PORT2"。

        resource

        指定在CSS关联的资源名,用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。

        说明:
        • ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。
        • ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。

        pushdown

        CSS的下压功能是否开启,默认为“true”。包含大量IO传输的表在有where过滤条件的情况下能够开启pushdown降低IO。

        strict

        CSS的下压是否是严格的,默认为“false”。精确匹配的场景下比pushdown降低更多IO。

        batch.size.entries

        单次batch插入entry的条数上限,默认为1000。如果单条数据非常大,在bulk存储设置的数据条数前提前到达了单次batch的总数据量上限,则停止存储数据,以batch.size.bytes为准,提交该批次的数据。

        batch.size.bytes

        单次batch的总数据量上限,默认为1mb。如果单条数据非常小,在bulk存储到总数据量前提前到达了单次batch的条数上限,则停止存储数据,以batch.size.entries为准,提交该批次的数据。

        es.nodes.wan.only

        是否仅通过域名访问es节点,默认为false。使用经典型跨源的连接地址作为es.nodes时,该参数需要配置为true;使用css服务提供的原始内网IP地址作为es.nodes时,不需要填写该参数或者配置为false。

        es.mapping.id

        指定一个字段,其值作为es中Document的id。

        说明:
        • 相同/index/type下的Document id是唯一的。如果作为Document id的字段存在重复值,则在执行插入es时,重复id的Document将会被覆盖。
        • 该特性可以用作容错解决方案。当插入数据执行一半时,DLI作业失败,会有部分数据已经插入到es中,这部分为冗余数据。如果设置了Document id,则在重新执行DLI作业时,会覆盖上一次的冗余数据。

        batch.size.entries和batch.size.bytes分别对数据条数和数据量大小进行限制。

      2. 插入数据。
        1
        sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
        
      3. 查询数据。
        1
        2
        val dataFrame = sparkSession.sql("select * from css_table")
        dataFrame.show()
        

        插入数据前:

        插入数据后:

      4. 删除数据表
        1
        sparkSession.sql("drop table css_table")
        
    • 通过DataFrame API访问
      1. 连接配置。
        1
        2
        val resource = "/mytest/css"
        val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
        
      2. 构造schema,并添加数据。
        1
        2
        val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
        val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
        
      3. 导入数据到CSS。
        1
        2
        3
        4
        5
        6
        7
        val dataFrame_1 = sparkSession.createDataFrame(rdd, schema)
        dataFrame_1.write 
          .format("css") 
          .option("resource", resource) 
          .option("es.nodes", nodes) 
          .mode(SaveMode.Append) 
          .save()
        

        SaveMode 有四种保存类型:

        • ErrorIfExis:如果已经存在数据,则抛出异常。
        • Overwrite:如果已经存在数据,则覆盖原数据。
        • Append:如果已经存在数据,则追加保存。
        • Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
      4. 读取CSS上的数据
        1
        2
        val dataFrameR = sparkSession.read.format("css").option("resource",resource).option("es.nodes", nodes).load()
        dataFrameR.show()
        

        插入数据前:

        插入数据后:

    • 提交Spark作业
      1. 将写好的代码生成jar包,上传至DLI中。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 如果选择Spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 “Spark参数(--conf)” 配置

          spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*

          spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*

        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
  • 完整示例代码
    • Maven依赖
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
    • 通过SQL API访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      import org.apache.spark.sql.SparkSession
      
      object Test_SQL_CSS {
        def main(args: Array[String]): Unit = {
          // Create a SparkSession session.
          val sparkSession = SparkSession.builder().getOrCreate()
      
          // Create a DLI data table for DLI-associated CSS
          sparkSession.sql("create table css_table(id long, name string) using css options(
      	'es.nodes' = 'to-css-1174404217-QG2SwbVV.datasource.com:9200',
      	'es.nodes.wan.only' = 'true',
      	'resource' = '/mytest/css')")
      
          //*****************************SQL model***********************************
          // Insert data into the DLI data table
          sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
         
          // Read data from DLI data table
          val dataFrame = sparkSession.sql("select * from css_table")
          dataFrame.show()
         
          // drop table
          sparkSession.sql("drop table css_table")
      
          sparkSession.close()
        }
      }
      
    • 通过DataFrame API访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      import org.apache.spark.sql.{Row, SaveMode, SparkSession};
      import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType};
      
      object Test_SQL_CSS {
        def main(args: Array[String]): Unit = {
          //Create a SparkSession session.
          val sparkSession = SparkSession.builder().getOrCreate()
      
          //*****************************DataFrame model***********************************
          // Setting the /index/type of CSS
          val resource = "/mytest/css"
        
          // Define the cross-origin connection address of the CSS cluster
          val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
      
          //Setting schema
          val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
        
          // Construction data
          val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
        
          // Create a DataFrame from RDD and schema
          val dataFrame_1 = sparkSession.createDataFrame(rdd, schema)
        
         //Write data to the CSS
         dataFrame_1.write.format("css") 
          .option("resource", resource) 
          .option("es.nodes", nodes) 
          .mode(SaveMode.Append) 
          .save()
        
          //Read data
          val dataFrameR = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load()
          dataFrameR.show()
      
          spardSession.close()
        }
      }
      

CSS安全集群

  • 开发说明
    • 构造依赖信息,创建SparkSession
      1. 导入依赖
        涉及到的mvn依赖库
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
        import相关依赖包
        1
        2
        import org.apache.spark.sql.{Row, SaveMode, SparkSession}
        import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
        
      2. 创建会话,并设置AK/SK。

        认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

        1
        2
        3
        4
        5
        val sparkSession = SparkSession.builder().getOrCreate()
        sparkSession.conf.set("fs.obs.access.key", ak)
        sparkSession.conf.set("fs.obs.secret.key", sk)
        sparkSession.conf.set("fs.obs.endpoint", enpoint)
        sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
        
    • 通过SQL API访问
      1. 创建DLI跨源访问 CSS的关联表。
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        sparkSession.sql("create table css_table(id int, name string) using css options(
        	'es.nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200',
        	'es.nodes.wan.only'='true',
        	'resource'='/mytest/css',
         	'es.net.ssl'='true',
        	'es.net.ssl.keystore.location'='obs://桶名/path/transport-keystore.jks',
        	'es.net.ssl.keystore.pass'='***',
        	'es.net.ssl.truststore.location'='obs://桶名/path/truststore.jks',
        	'es.net.ssl.truststore.pass'='***',
        	'es.net.http.auth.user'='admin',
        	'es.net.http.auth.pass'='***')")
        
        表2 创建表参数

        参数

        说明

        es.nodes

        CSS的连接地址,需要先创建跨源连接。具体操作请参考《数据湖探索用户指南》。

        创建经典型跨源连接后,使用经典型跨源连接中返回的连接地址。

        创建增强型跨源连接后,使用CSS提供的"内网访问地址",格式为"IP1:PORT1,IP2:PORT2"。

        resource

        指定在CSS关联的资源名,用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。

        说明:

        1. ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。

        2. ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。

        pushdown

        CSS的下压功能是否开启,默认为“true”。包含大量IO传输的表在有where过滤条件的情况下能够开启pushdown降低IO。

        strict

        CSS的下压是否是严格的,默认为“false”。精确匹配的场景下比pushdown降低更多IO。

        batch.size.entries

        单次batch插入entry的条数上限,默认为1000。如果单条数据非常大,在bulk存储设置的数据条数前提前到达了单次batch的总数据量上限,则停止存储数据,以batch.size.bytes为准,提交该批次的数据。

        batch.size.bytes

        单次batch的总数据量上限,默认为1mb。如果单条数据非常小,在bulk存储到总数据量前提前到达了单次batch的条数上限,则停止存储数据,以batch.size.entries为准,提交该批次的数据。

        es.nodes.wan.only

        是否仅通过域名访问es节点,默认为“false”。使用经典型跨源的连接地址作为es.nodes时,该参数需要配置为“true”;使用CSS服务提供的原始内网IP地址作为es.nodes时,不需要填写该参数或者配置为“false”。

        es.mapping.id

        指定一个字段,其值作为es中Document的id。

        说明:
        • 相同/index/type下的Document id是唯一的。如果作为Document id的字段存在重复值,则在执行插入es时,重复id的Document将会被覆盖。
        • 该特性可以用作容错解决方案。当插入数据执行一半时,DLI作业失败,会有部分数据已经插入到es中,这部分为冗余数据。如果设置了Document id,则在重新执行DLI作业时,会覆盖上一次的冗余数据。

        es.net.ssl

        连接安全CSS集群,默认值为“false”。

        es.net.ssl.keystore.location

        安全CSS集群的证书,生成的keystore文件在OBS上的地址。

        es.net.ssl.keystore.pass

        安全CSS集群的证书,生成的keystore文件时的密码。

        es.net.ssl.truststore.location

        安全CSS集群的证书,生成的truststore文件在OBS上的地址。

        es.net.ssl.truststore.pass

        安全CSS集群的证书,生成的truststore文件时的密码。

        es.net.http.auth.user

        安全CSS集群的用户名。

        es.net.http.auth.pass

        安全CSS集群的密码。

        “batch.size.entries”“batch.size.bytes”分别对数据条数和数据量大小进行限制。

      2. 插入数据。
        1
        sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
        
      3. 查询数据。
        1
        2
        val dataFrame = sparkSession.sql("select * from css_table")
        dataFrame.show()
        

        插入数据前:

        插入数据后:

      4. 删除数据表
        1
        sparkSession.sql("drop table css_table")
        
    • 通过DataFrame API访问
      1. 连接配置。
        1
        2
        val resource = "/mytest/css"
        val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
        
      2. 构造schema,并添加数据。
        1
        2
        val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
        val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
        
      3. 导入数据到CSS。
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        val dataFrame_1 = sparkSession.createDataFrame(rdd, schema)
        dataFrame_1.write 
          .format("css") 
          .option("resource", resource) 
          .option("es.nodes", nodes) 
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode(SaveMode.Append) 
          .save()
        

        SaveMode 有四种保存类型:

        • ErrorIfExis:如果已经存在数据,则抛出异常。
        • Overwrite:如果已经存在数据,则覆盖原数据。
        • Append:如果已经存在数据,则追加保存。
        • Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
      4. 读取CSS上的数据
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        val dataFrameR = sparkSession.read.format("css")
                .option("resource",resource)
                .option("es.nodes", nodes)
                .option("es.net.ssl", "true")
                .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")
                .option("es.net.ssl.keystore.pass", "***")
                .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")
                .option("es.net.ssl.truststore.pass", "***")
                .option("es.net.http.auth.user", "admin")
                .option("es.net.http.auth.pass", "***")
                .load()
        dataFrameR.show()
        

        插入数据前:

        插入数据后:

    • 提交Spark作业
      1. 将写好的代码生成jar包,上传至DLI中。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明。
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
  • 完整示例代码
    • Maven依赖
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
    • 通过SQL API访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      import org.apache.spark.sql.SparkSession
       
      object csshttpstest {
        def main(args: Array[String]): Unit = {
          //Create a SparkSession session.
          val sparkSession = SparkSession.builder().getOrCreate()
          // Create a DLI data table for DLI-associated CSS
          sparkSession.sql("create table css_table(id long, name string) using css options('es.nodes' = '192.168.6.204:9200','es.nodes.wan.only' = 'false','resource' = '/mytest','es.net.ssl'='true','es.net.ssl.keystore.location' = 'obs://xietest1/lzq/keystore.jks','es.net.ssl.keystore.pass' = '**','es.net.ssl.truststore.location'='obs://xietest1/lzq/truststore.jks','es.net.ssl.truststore.pass'='**','es.net.http.auth.user'='admin','es.net.http.auth.pass'='**')")
       
          //*****************************SQL model***********************************
          // Insert data into the DLI data table
          sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
       
          // Read data from DLI data table
          val dataFrame = sparkSession.sql("select * from css_table")
          dataFrame.show()
       
          // drop table
          sparkSession.sql("drop table css_table")
       
          sparkSession.close()
        }
      }
      
    • 通过DataFrame API访问

      认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      import org.apache.spark.sql.{Row, SaveMode, SparkSession};
      import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType};
      
      object Test_SQL_CSS {
        def main(args: Array[String]): Unit = {
          //Create a SparkSession session.
          val sparkSession = SparkSession.builder().getOrCreate()
          sparkSession.conf.set("fs.obs.access.key", ak)
          sparkSession.conf.set("fs.obs.secret.key", sk)
      
          //*****************************DataFrame model***********************************
          // Setting the /index/type of CSS
          val resource = "/mytest/css"
        
          // Define the cross-origin connection address of the CSS cluster
          val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
      
          //Setting schema
          val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
        
          // Construction data
          val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
        
          // Create a DataFrame from RDD and schema
          val dataFrame_1 = sparkSession.createDataFrame(rdd, schema)
        
         //Write data to the CSS
         dataFrame_1.write .format("css") 
          .option("resource", resource) 
          .option("es.nodes", nodes) 
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode(SaveMode.Append) 
          .save();
        
          //Read data
          val dataFrameR = sparkSession.read.format("css")
          .option("resource", resource)
          .option("es.nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .load()
          dataFrameR.show()
      
          spardSession.close()
        }
      }