更新时间:2024-07-04 GMT+08:00

pyspark样例代码

前提条件

在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。

CSS非安全集群

  • 开发说明
    • 代码实现详解
      1. import相关依赖包
        1
        2
        3
        from __future__ import print_function
        from pyspark.sql.types import StructType, StructField, IntegerType, StringType, Row
        from pyspark.sql import SparkSession
        
      2. 创建会话
        1
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        
    • 通过DataFrame API 访问
      1. 连接配置
        1
        2
        resource = "/mytest"
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
        

        resource为指定在CSS关联的资源名。格式可以用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。

        • ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。
        • ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。
      2. 构造schema,并添加数据
        1
        2
        3
        schema = StructType([StructField("id", IntegerType(), False),                  
                             StructField("name", StringType(), False)])
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
        
      3. 构造DataFrame
        1
        dataFrame = sparkSession.createDataFrame(rdd, schema)
        
      4. 保存数据到CSS
        1
        dataFrame.write.format("css").option("resource", resource).option("es.nodes", nodes).mode("Overwrite").save()
        

        mode 有四种保存类型:

        • ErrorIfExis:如果已经存在数据,则抛出异常。
        • Overwrite:如果已经存在数据,则覆盖原数据。
        • Append:如果已经存在数据,则追加保存。
        • Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
      5. 读取CSS上的数据
        1
        2
        jdbcDF = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load()
        jdbcDF.show()
        
      6. 操作结果

    • 通过SQL API 访问
      1. 创建DLI跨源访问 CSS的关联表。
        1
        2
        3
        4
        5
        sparkSession.sql(
            "create table css_table(id long, name string) using css options(  
            'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',
            'es.nodes.wan.only'='true',
            'resource'='/mytest')")
        

        创建CSS跨源表的参数详情可参考表1

      2. 插入数据
        1
        sparkSession.sql("insert into css_table values(3,'tom')")
        
      3. 查询数据
        1
        2
        jdbcDF = sparkSession.sql("select * from css_table")
        jdbcDF.show()
        
      4. 操作结果

    • 提交Spark作业
      1. 将写好的python代码文件上传至DLI中。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

        控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 如果选择Spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 “Spark参数(--conf)” 配置

          spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*

          spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*

        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
  • 完整示例代码
    • 通过DataFrame API 访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.   
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
      
        # Setting cross-source connection parameters  
        resource = "/mytest"
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
      
        # Setting schema  
        schema = StructType([StructField("id", IntegerType(), False),       
                             StructField("name", StringType(), False)])
      
        # Construction data 
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
      
        # Create a DataFrame from RDD and schema  
        dataFrame = sparkSession.createDataFrame(rdd, schema)
      
        # Write data to the CSS 
        dataFrame.write.format("css").option("resource", resource).option("es.nodes", nodes).mode("Overwrite").save()
      
        # Read data  
        jdbcDF = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load()
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()
      
    • 通过SQL API 访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.  
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
      
        # Create a DLI data table for DLI-associated CSS   
        sparkSession.sql(
            "create table css_table(id long, name string) using css options( \
            'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',\
            'es.nodes.wan.only'='true',\
            'resource'='/mytest')")
      
        # Insert data into the DLI data table  
        sparkSession.sql("insert into css_table values(3,'tom')")
      
        # Read data from DLI data table   
        jdbcDF = sparkSession.sql("select * from css_table")   
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()
      

CSS安全集群

  • 开发说明
    • 代码实现详解
      1. import相关依赖包
        1
        2
        3
        from __future__ import print_function
        from pyspark.sql.types import StructType, StructField, IntegerType, StringType, Row
        from pyspark.sql import SparkSession
        
      2. 创建会话并设置AK/SK

        认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

        1
        2
        3
        4
        5
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        sparkSession.conf.set("fs.obs.access.key", ak)
        sparkSession.conf.set("fs.obs.secret.key", sk)
        sparkSession.conf.set("fs.obs.endpoint", enpoint)
        sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
        
    • 通过DataFrame API 访问
      1. 连接配置
        1
        2
        resource = "/mytest";
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
        

        resource为指定在CSS关联的资源名。格式可以用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。

        • ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。
        • ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。
      2. 构造schema,并添加数据
        1
        2
        3
        schema = StructType([StructField("id", IntegerType(), False),                  
                             StructField("name", StringType(), False)])
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
        
      3. 构造DataFrame
        1
        dataFrame = sparkSession.createDataFrame(rdd, schema)
        
      4. 保存数据到CSS
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        dataFrame.write.format("css")
          .option("resource", resource)
          .option("es.nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode("Overwrite")
          .save()
        

        mode 有四种保存类型:

        • ErrorIfExis:如果已经存在数据,则抛出异常。
        • Overwrite:如果已经存在数据,则覆盖原数据。
        • Append:如果已经存在数据,则追加保存。
        • Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
      5. 读取CSS上的数据
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        jdbcDF = sparkSession.read.format("css")\
          .option("resource", resource)\
          .option("es.nodes", nodes)\
          .option("es.net.ssl", "true")\
          .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")\
          .option("es.net.ssl.keystore.pass", "***")\
          .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")\
          .option("es.net.ssl.truststore.pass", "***")\
          .option("es.net.http.auth.user", "admin")\
          .option("es.net.http.auth.pass", "***")\
          .load()
        jdbcDF.show()
        
      6. 操作结果

    • 通过SQL API 访问
      1. 创建DLI跨源访问 CSS的关联表。
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        sparkSession.sql(
                "create table css_table(id long, name string) using css options(\  
                'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',\
                'es.nodes.wan.only'='true',\
                'resource'='/mytest',\
        	'es.net.ssl'='true',\
        	'es.net.ssl.keystore.location'='obs://桶名/path/transport-keystore.jks',\
        	'es.net.ssl.keystore.pass'='***',\
        	'es.net.ssl.truststore.location'='obs://桶名/path/truststore.jks',\
        	'es.net.ssl.truststore.pass'='***',\
        	'es.net.http.auth.user'='admin',\
        	'es.net.http.auth.pass'='***')")
        

        创建CSS跨源表的参数详情可参考表1

      2. 插入数据
        1
        sparkSession.sql("insert into css_table values(3,'tom')")
        
      3. 查询数据
        1
        2
        jdbcDF = sparkSession.sql("select * from css_table")
        jdbcDF.show()
        
      4. 操作结果

    • 提交Spark作业
      1. 将写好的python代码文件上传至DLI中。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
      2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
        • 提交作业时,需要指定Module模块,名称为:sys.datasource.css。
        • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明。
        • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
  • 完整示例代码
    • 通过DataFrame API 访问

      认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.   
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        sparkSession.conf.set("fs.obs.access.key", ak)
        sparkSession.conf.set("fs.obs.secret.key", sk)
        sparkSession.conf.set("fs.obs.endpoint", enpoint)
        sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
      
        # Setting cross-source connection parameters  
        resource = "/mytest";
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
      
        # Setting schema  
        schema = StructType([StructField("id", IntegerType(), False),       
                             StructField("name", StringType(), False)])
      
        # Construction data 
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
      
        # Create a DataFrame from RDD and schema  
        dataFrame = sparkSession.createDataFrame(rdd, schema)
      
        # Write data to the CSS 
        dataFrame.write.format("css")
          .option("resource", resource)
          .option("es.nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode("Overwrite")
          .save()
      
        # Read data  
        jdbcDF = sparkSession.read.format("css")\
          .option("resource", resource)\
          .option("es.nodes", nodes)\
          .option("es.net.ssl", "true")\
          .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")\
          .option("es.net.ssl.keystore.pass", "***")\
          .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")\
          .option("es.net.http.auth.user", "admin")\
          .option("es.net.http.auth.pass", "***")\
          .load()
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()
      
    • 通过SQL API 访问
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql import SparkSession
      import os               
       
      if __name__ == "__main__":
        
        # Create a SparkSession session.   
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        # Create a DLI data table for DLI-associated CSS   
        sparkSession.sql("create table css_table(id int, name string) using css options(\
                          'es.nodes'='192.168.6.204:9200',\
                          'es.nodes.wan.only'='true',\
                          'resource'='/mytest',\
                          'es.net.ssl'='true',\
                          'es.net.ssl.keystore.location' = 'obs://xietest1/lzq/keystore.jks',\
                          'es.net.ssl.keystore.pass' = '**',\
                          'es.net.ssl.truststore.location'='obs://xietest1/lzq/truststore.jks',\
                          'es.net.ssl.truststore.pass'='**',\
                          'es.net.http.auth.user'='admin',\
                          'es.net.http.auth.pass'='**')")
       
        # Insert data into the DLI data table  
        sparkSession.sql("insert into css_table values(3,'tom')")
       
        # Read data from DLI data table   
        jdbcDF = sparkSession.sql("select * from css_table")   
        jdbcDF.show()
       
        # close session  
        sparkSession.stop()