PySpark Example Code

Prerequisites

A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.

CSS Non-security Cluster

  • Development description
    • Code implementation
      1. Dependencies related to import
        1
        2
        3
        from __future__ import print_function
        from pyspark.sql.types import StructType, StructField, IntegerType, StringType
        from pyspark.sql import SparkSession
        
      2. Create a session
        1
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        
    • Connecting to datasources through DataFrame APIs
      1. Configure datasource connection
        1
        2
        resource = "/mytest/css"
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
        
      2. Create a schema and add data to the schema
        1
        2
        3
        schema = StructType([StructField("id", IntegerType(), False),                  
                             StructField("name", StringType(), False)])
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
        
      3. Construct a DataFrame
        1
        dataFrame = sparkSession.createDataFrame(rdd, schema)
        
      4. Save the data to CSS
        1
        dataFrame.write.format("css").option("resource", resource).option("nodes", nodes).mode("Overwrite").save()
        

        The value of mode can be one of the following:

        • ErrorIfExis: If the data already exists, the system throws an exception.
        • Overwrite: If the data already exists, the original data will be overwritten.
        • Append: If the data already exists, the system saves the new data.
        • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
      5. Read data from CSS
        1
        2
        jdbcDF = sparkSession.read.format("css").option("resource", resource).option("nodes", nodes).load()
        jdbcDF.show()
        
      6. Operation result

    • Connecting to datasources through SQL APIs
      1. Create a table to connect to CSS datasource
        1
        2
        3
        4
        5
        sparkSession.sql(
          "create table css_table(id long, name string) using css options(  
            'nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',
            'nodes.wan.only'='true',
            'resource'='/mytest/css')")
        

        For details about the parameters for creating a CSS datasource connection table, see Table 1.

      2. Insert data
        1
        sparkSession.sql("insert into css_table values(3,'tom')")
        
      3. Query data
        1
        2
        jdbcDF = sparkSession.sql("select * from css_table")
        jdbcDF.show()
        
      4. Operation result

    • Submitting a Spark job
      1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
      2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
        • When submitting a job, you need to specify a dependency module named sys.datasource.css.
        • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
        • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.
  • Complete example code
    • Connecting to datasources through DataFrame APIs
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.   
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
      
        # Setting cross-source connection parameters  
        resource = "/mytest/css"
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
      
        # Setting schema  
        schema = StructType([StructField("id", IntegerType(), False),       
                             StructField("name", StringType(), False)])
      
        # Construction data 
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
      
        # Create a DataFrame from RDD and schema  
        dataFrame = sparkSession.createDataFrame(rdd, schema)
      
        # Write data to the CSS 
        dataFrame.write.format("css").option("resource", resource).option("nodes", nodes).mode("Overwrite").save()
      
        # Read data  
        jdbcDF = sparkSession.read.format("css").option("resource", resource).option("nodes", nodes).load()
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()
      
    • Connecting to datasources through SQL APIs
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.  
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
      
        # Create a DLI data table for DLI-associated CSS   
        sparkSession.sql(
          "create table css_table(id long, name string) using css options( 
            'nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',
            'nodes.wan.only'='true',
            'resource'='/mytest/css')")
      
        # Insert data into the DLI data table  
        sparkSession.sql("insert into css_table values(3,'tom')")
      
        # Read data from DLI data table   
        jdbcDF = sparkSession.sql("select * from css_table")   
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()
      

CSS Security Cluster

  • Preparations
    • The Elasticsearch 6.5.4 provided by CSS provides the security settings. Once the function is enabled, CSS provides identity authentication, authorization, and encryption for users. Before connecting DLI to the CSS security cluster, you need to perform certain preparations.
      1. Select CSS Elasticsearch 6.5.4 or a later cluster version, create a CSS security cluster, and download the security cluster certificate (CloudSearchService.cer).
      2. Create a datasource connection.
      3. Use the keytools tool to generate the keystore and truststore files.
        1. While generating the required files, the security certificate (CloudSearchService.cer) of the security cluster is required. Run the following commands to generate the files. Other parameters of the keytools tool can be set as required.
          keytool -genkeypair -alias certificatekey -keyalg RSA -keystore transport-keystore.jks
          keytool -list -v -keystore transport-keystore.jks
          keytool -import -alias certificatekey -file CloudSearchService.cer  -keystore truststore.jks
          keytool -list -v -keystore truststore
        2. Upload the generated keystore and truststore files to an OBS bucket.
  • Development description
    • Code implementation
      1. Dependencies related to import
        1
        2
        3
        from __future__ import print_function
        from pyspark.sql.types import StructType, StructField, IntegerType, StringType
        from pyspark.sql import SparkSession
        
      2. Create a session and set the access keys.
        1
        2
        3
        4
        5
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        sparkSession.conf.set("fs.obs.access.key", ak)
        sparkSession.conf.set("fs.obs.secret.key", sk)
        sparkSession.conf.set("fs.obs.endpoint", enpoint)
        sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
        
    • Connecting to datasources through DataFrame APIs
      1. Configure datasource connection
        1
        2
        resource = "/mytest/css";
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
        
      2. Create a schema and add data to the schema
        1
        2
        3
        schema = StructType([StructField("id", IntegerType(), False),                  
                             StructField("name", StringType(), False)])
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
        
      3. Construct a DataFrame
        1
        dataFrame = sparkSession.createDataFrame(rdd, schema)
        
      4. Save the data to CSS
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        dataFrame.write.format("css")
          .option("resource", resource)
          .option("nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://AK:SK@bucket name/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://AK:SK@bucket name/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode("Overwrite")
          .save()
        

        The value of mode can be one of the following:

        • ErrorIfExis: If the data already exists, the system throws an exception.
        • Overwrite: If the data already exists, the original data will be overwritten.
        • Append: If the data already exists, the system saves the new data.
        • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
      5. Read data from CSS
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        jdbcDF = sparkSession.read.format("css")
          .option("resource", resource)
          .option("nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://AK:SK@bucket name/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://AK:SK@bucket name/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .load()
        jdbcDF.show()
        
      6. Operation result

    • Submitting a Spark job
      1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
      2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
        • When submitting a job, you need to specify a dependency module named sys.datasource.css.
        • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
        • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.
  • Complete example code
    • Connecting to datasources through DataFrame APIs
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.   
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        sparkSession.conf.set("fs.obs.access.key", ak)
        sparkSession.conf.set("fs.obs.secret.key", sk)
        sparkSession.conf.set("fs.obs.endpoint", enpoint)
        sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
      
        # Setting cross-source connection parameters  
        resource = "/mytest/css";
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
      
        # Setting schema  
        schema = StructType([StructField("id", IntegerType(), False),       
                             StructField("name", StringType(), False)])
      
        # Construction data 
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
      
        # Create a DataFrame from RDD and schema  
        dataFrame = sparkSession.createDataFrame(rdd, schema)
      
        # Write data to the CSS 
        dataFrame.write.format("css")
          .option("resource", resource)
          .option("nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://AK:SK@bucket name/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://AK:SK@bucket name/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode("Overwrite")
          .save()
      
        # Read data  
        jdbcDF = sparkSession.read.format("css")
          .option("resource", resource)
          .option("nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://AK:SK@bucket name/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://AK:SK@bucket name/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .load()
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()