文档首页 > > 开发指南> 使用Spark作业跨源访问数据源> 对接CSS>

scala样例代码

scala样例代码

分享
更新时间:2020/12/21 GMT+08:00

前提条件

在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。

CSS非安全集群

  • 开发说明
    • 构造依赖信息,创建SparkSession
      1. 导入依赖
        涉及到的mvn依赖库
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
        import相关依赖包
        1
        2
        import org.apache.spark.sql.{Row, SaveMode, SparkSession}
        import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
        
      2. 创建会话。
        1
        val sparkSession = SparkSession.builder().getOrCreate()
        
    • 通过SQL API访问
      1. 创建DLI跨源访问 CSS的关联表。
        1
        2
        3
        4
        sparkSession.sql("create table css_table(id int, name string) using css options(
        	'nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200',
        	'nodes.wan.only'='true',
        	'resource' '/mytest/css')")
        
        表1 创建表参数

        参数

        说明

        es.nodes

        CSS的连接地址,需要先创建跨源连接。具体操作请参考《数据湖探索用户指南》。

        创建经典型跨源连接后,使用经典型跨源连接中返回的连接地址。

        创建增强型跨源连接后,使用CSS提供的"内网访问地址",格式为"IP1:PORT1,IP2:PORT2"。

        resource

        指定在CSS关联的资源名,用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。

        说明:
        • ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。
        • ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。

        pushdown

        CSS的下压功能是否开启,默认为“true”。包含大量IO传输的表在有where过滤条件的情况下能够开启pushdown降低IO。

        strict

        CSS的下压是否是严格的,默认为“false”。精确匹配的场景下比pushdown降低更多IO。

        batch.size.entries

        单次batch插入entry的条数上限,默认为1000。如果单条数据非常大,在bulk存储设置的数据条数前提前到达了单次batch的总数据量上限,则停止存储数据,以batch.size.bytes为准,提交该批次的数据。

        batch.size.bytes

        单次batch的总数据量上限,默认为1mb。如果单条数据非常小,在bulk存储到总数据量前提前到达了单次batch的条数上限,则停止存储数据,以batch.size.entries为准,提交该批次的数据。

        es.nodes.wan.only

        是否仅通过域名访问es节点,默认为false。使用经典型跨源的连接地址作为es.nodes时,该参数需要配置为true;使用css服务提供的原始内网IP地址作为es.nodes时,不需要填写该参数或者配置为false。

        es.mapping.id

        指定一个字段,其值作为es中Document的id。

        说明:
        • 相同/index/type下的Document id是唯一的。如果作为Document id的字段存在重复值,则在执行插入es时,重复id的Document将会被覆盖。
        • 该特性可以用作容错解决方案。当插入数据执行一半时,DLI作业失败,会有部分数据已经插入到es中,这部分为冗余数据。如果设置了Document id,则在重新执行DLI作业时,会覆盖上一次的冗余数据。

        batch.size.entries和batch.size.bytes分别对数据条数和数据量大小进行限制。

      2. 插入数据。
        1
        sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
        
      3. 查询数据。
        1
        2
        val dataFrame = sparkSession.sql("select * from css_table")
        dataFrame.show()
        

        插入数据前:

        插入数据后:

      4. 删除数据表
        1
        sparkSession.sql("drop table css_table")
        
    • 通过DataFrame API访问
      1. 连接配置。
        1
        2
        val resource = "/mytest/css"
        val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
        
      2. 构造schema,并添加数据。
        1
        2
        val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
        val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
        
      3. 导入数据到CSS。
        1
        2
        3
        4
        5
        6
        7
        val dataFrame_1 = sparkSession.createDataFrame(rdd, schema)
        dataFrame_1.write 
          .format("css") 
          .option("resource", resource) 
          .option("nodes", nodes) 
          .mode(SaveMode.Append) 
          .save()
        

        SaveMode 有四种保存类型:

        • ErrorIfExis:如果已经存在数据,则抛出异常。
        • Overwrite:如果已经存在数据,则覆盖原数据。
        • Append:如果已经存在数据,则追加保存。
        • Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
      4. 读取CSS上的数据
        1
        2
        val dataFrameR = sparkSession.read.format("css").option("resource",resource).option("nodes", nodes).load()
        dataFrameR.show()
        

        插入数据前:

        插入数据后:

    • 提交Spark作业
      1. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参数说明”。
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
  • 完整示例代码
    • Maven依赖
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
    • 通过SQL API访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      import org.apache.spark.sql.SparkSession
      
      object Test_SQL_CSS {
        def main(args: Array[String]): Unit = {
          // Create a SparkSession session.
          val sparkSession = SparkSession.builder().getOrCreate()
      
          // Create a DLI data table for DLI-associated CSS
          sparkSession.sql("create table css_table(id long, name string) using css options(
      	'nodes' = 'to-css-1174404217-QG2SwbVV.datasource.com:9200',
      	'nodes.wan.only' = 'true',
      	'resource' = '/mytest/css')")
      
          //*****************************SQL model***********************************
          // Insert data into the DLI data table
          sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
         
          // Read data from DLI data table
          val dataFrame = sparkSession.sql("select * from css_table")
          dataFrame.show()
         
          // drop table
          sparkSession.sql("drop table css_table")
      
          sparkSession.close()
        }
      }
      
    • 通过DataFrame API访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      import org.apache.spark.sql.{Row, SaveMode, SparkSession};
      import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType};
      
      object Test_SQL_CSS {
        def main(args: Array[String]): Unit = {
          //Create a SparkSession session.
          val sparkSession = SparkSession.builder().getOrCreate()
      
          //*****************************DataFrame model***********************************
          // Setting the /index/type of CSS
          val resource = "/mytest/css"
        
          // Define the cross-origin connection address of the CSS cluster
          val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
      
          //Setting schema
          val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
        
          // Construction data
          val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
        
          // Create a DataFrame from RDD and schema
          val dataFrame_1 = sparkSession.createDataFrame(rdd, schema)
        
         //Write data to the CSS
         dataFrame_1.write .format("css") 
          .option("resource", resource) 
          .option("nodes", nodes) 
          .mode(SaveMode.Append) 
          .save()
        
          //Read data
          val dataFrameR = sparkSession.read.format("css").option("resource", resource).option("nodes", nodes).load()
          dataFrameR.show()
      
          spardSession.close()
        }
      }
      

CSS安全集群

  • 准备工作
    • 当前CSS服务提供的Elasticsearch 6.5.4集群版本为用户增加了安全模式功能,开启安全模式后,将会为用户提供身份验证、授权以及加密等功能。DLI服务对接CSS安全集群时,需要先进行以下准备工作。
      1. 选择CSS Elasticsearch 6.5.4或以上集群版本,创建CSS安全集群,并下载安全集群证书(CloudSearchService.cer)。
      2. 创建跨源连接。
      3. 使用keytools工具生成keystore和truststore文件
        1. 使用keytools工具生成keystore和truststore文件,其中需要使用到安全集群的安全证书(CloudSearchService.cer),具体命令如下所示,keytools工具还有其他参数,可根据需求设置。
          keytool -genkeypair -alias certificatekey -keyalg RSA -keystore transport-keystore.jks
          keytool -list -v -keystore transport-keystore.jks
          keytool -import -alias certificatekey -file CloudSearchService.cer  -keystore truststore.jks
          keytool -list -v -keystore truststore
        2. 将生成的keystore和truststore文件上传到OBS桶中。
  • 开发说明
    • 构造依赖信息,创建SparkSession
      1. 导入依赖
        涉及到的mvn依赖库
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
        import相关依赖包
        1
        2
        import org.apache.spark.sql.{Row, SaveMode, SparkSession}
        import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
        
      2. 创建会话,并设置AK/SK。
        1
        2
        3
        4
        5
        val sparkSession = SparkSession.builder().getOrCreate()
        sparkSession.conf.set("fs.obs.access.key", ak)
        sparkSession.conf.set("fs.obs.secret.key", sk)
        sparkSession.conf.set("fs.obs.endpoint", enpoint)
        sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
        
    • 通过DataFrame API访问
      1. 连接配置。
        1
        2
        val resource = "/mytest/css"
        val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
        
      2. 构造schema,并添加数据。
        1
        2
        val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
        val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
        
      3. 导入数据到CSS。
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        val dataFrame_1 = sparkSession.createDataFrame(rdd, schema)
        dataFrame_1.write 
          .format("css") 
          .option("resource", resource) 
          .option("nodes", nodes) 
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode(SaveMode.Append) 
          .save()
        

        SaveMode 有四种保存类型:

        • ErrorIfExis:如果已经存在数据,则抛出异常。
        • Overwrite:如果已经存在数据,则覆盖原数据。
        • Append:如果已经存在数据,则追加保存。
        • Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
      4. 读取CSS上的数据
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        val dataFrameR = sparkSession.read.format("css")
                .option("resource",resource)
                .option("nodes", nodes)
                .option("es.net.ssl", "true")
                .option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks")
                .option("es.net.ssl.keystore.pass", "***")
                .option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks")
                .option("es.net.ssl.truststore.pass", "***")
                .option("es.net.http.auth.user", "admin")
                .option("es.net.http.auth.pass", "***")
                .load()
        dataFrameR.show()
        

        插入数据前:

        插入数据后:

    • 提交Spark作业
      1. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参数说明”。
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
  • 完整示例代码
    • Maven依赖
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
    • 通过DataFrame API访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      import org.apache.spark.sql.{Row, SaveMode, SparkSession};
      import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType};
      
      object Test_SQL_CSS {
        def main(args: Array[String]): Unit = {
          //Create a SparkSession session.
          val sparkSession = SparkSession.builder().getOrCreate()
          sparkSession.conf.set("fs.obs.access.key", ak)
          sparkSession.conf.set("fs.obs.secret.key", sk)
      
          //*****************************DataFrame model***********************************
          // Setting the /index/type of CSS
          val resource = "/mytest/css"
        
          // Define the cross-origin connection address of the CSS cluster
          val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
      
          //Setting schema
          val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
        
          // Construction data
          val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
        
          // Create a DataFrame from RDD and schema
          val dataFrame_1 = sparkSession.createDataFrame(rdd, schema)
        
         //Write data to the CSS
         dataFrame_1.write .format("css") 
          .option("resource", resource) 
          .option("nodes", nodes) 
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode(SaveMode.Append) 
          .save();
        
          //Read data
          val dataFrameR = sparkSession.read.format("css")
          .option("resource", resource)
          .option("nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .load()
          dataFrameR.show()
      
          spardSession.close()
        }
      }
      
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!非常感谢您的反馈,我们会继续努力做到更好!
反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区论坛频道来与我们联系探讨

智能客服提问云社区提问