Updated on 2024-06-13 GMT+08:00

PySpark Example Code

Prerequisites

A datasource connection has been created on the DLI management console.

CSS Non-Security Cluster

  • Development description
    • Code implementation
      1. Import dependency packages.
        1
        2
        3
        from __future__ import print_function
        from pyspark.sql.types import StructType, StructField, IntegerType, StringType, Row
        from pyspark.sql import SparkSession
        
      2. Create a session.
        1
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        
    • Connecting to data sources through DataFrame APIs
      1. Set connection parameters.
        1
        2
        resource = "/mytest"
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
        

        resource indicates the name of the resource associated with the CSS. You can specify the resource location in /index/type format. (The index can be the database and type the table.)

        • In Elasticsearch 6.X, a single index supports only one type, and the type name can be customized.
        • In Elasticsearch 7.X, a single index uses _doc as the type name and cannot be customized. To access Elasticsearch 7.X, set this parameter to index.
      2. Create a schema and add data to it.
        1
        2
        3
        schema = StructType([StructField("id", IntegerType(), False),                  
                             StructField("name", StringType(), False)])
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
        
      3. Construct a DataFrame.
        1
        dataFrame = sparkSession.createDataFrame(rdd, schema)
        
      4. Save data to CSS.
        1
        dataFrame.write.format("css").option("resource", resource).option("es.nodes", nodes).mode("Overwrite").save()
        

        The options of mode can be one of the following:

        • ErrorIfExis: If the data already exists, the system throws an exception.
        • Overwrite: If the data already exists, the original data will be overwritten.
        • Append: If the data already exists, the system saves the new data.
        • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
      5. Read data from CSS.
        1
        2
        jdbcDF = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load()
        jdbcDF.show()
        
      6. View the operation result.

    • Connecting to data sources through SQL APIs
      1. Create a table to connect to a CSS data source.
        1
        2
        3
        4
        5
        sparkSession.sql(
            "create table css_table(id long, name string) using css options(  
            'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',
            'es.nodes.wan.only'='true',
            'resource'='/mytest')")
        

        For details about the parameters for creating a CSS datasource connection table, see Table 1.

      2. Insert data.
        1
        sparkSession.sql("insert into css_table values(3,'tom')")
        
      3. Query data.
        1
        2
        jdbcDF = sparkSession.sql("select * from css_table")
        jdbcDF.show()
        
      4. View the operation result.

    • Submitting a Spark job
      1. Upload the Python code file to DLI.

      2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

        • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.css when you submit a job.
        • If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).

          spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*

          spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*

        • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
        • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.
  • Complete example code
    • Connecting to data sources through DataFrame APIs
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.   
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
      
        # Setting cross-source connection parameters  
        resource = "/mytest"
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
      
        # Setting schema  
        schema = StructType([StructField("id", IntegerType(), False),       
                             StructField("name", StringType(), False)])
      
        # Construction data 
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
      
        # Create a DataFrame from RDD and schema  
        dataFrame = sparkSession.createDataFrame(rdd, schema)
      
        # Write data to the CSS 
        dataFrame.write.format("css").option("resource", resource).option("es.nodes", nodes).mode("Overwrite").save()
      
        # Read data  
        jdbcDF = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load()
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()
      
    • Connecting to data sources through SQL APIs
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.  
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
      
        # Create a DLI data table for DLI-associated CSS   
        sparkSession.sql(
            "create table css_table(id long, name string) using css options( \
            'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',\
            'es.nodes.wan.only'='true',\
            'resource'='/mytest')")
      
        # Insert data into the DLI data table  
        sparkSession.sql("insert into css_table values(3,'tom')")
      
        # Read data from DLI data table   
        jdbcDF = sparkSession.sql("select * from css_table")   
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()
      

CSS Security Cluster

  • Development description
    • Code implementation
      1. Import dependency packages.
        1
        2
        3
        from __future__ import print_function
        from pyspark.sql.types import StructType, StructField, IntegerType, StringType, Row
        from pyspark.sql import SparkSession
        
      2. Create a session and set the AKs and SKs.

        Hard-coded or plaintext AK and SK pose significant security risks. To ensure security, encrypt your AK and SK, store them in configuration files or environment variables, and decrypt them when needed.

        1
        2
        3
        4
        5
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        sparkSession.conf.set("fs.obs.access.key", ak)
        sparkSession.conf.set("fs.obs.secret.key", sk)
        sparkSession.conf.set("fs.obs.endpoint", enpoint)
        sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
        
    • Connecting to data sources through DataFrame APIs
      1. Set connection parameters.
        1
        2
        resource = "/mytest";
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
        

        resource indicates the name of the resource associated with the CSS. You can specify the resource location in /index/type format. (The index can be the database and type the table.)

        • In Elasticsearch 6.X, a single index supports only one type, and the type name can be customized.
        • In Elasticsearch 7.X, a single index uses _doc as the type name and cannot be customized. To access Elasticsearch 7.X, set this parameter to index.
      2. Create a schema and add data to it.
        1
        2
        3
        schema = StructType([StructField("id", IntegerType(), False),                  
                             StructField("name", StringType(), False)])
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
        
      3. Construct a DataFrame.
        1
        dataFrame = sparkSession.createDataFrame(rdd, schema)
        
      4. Save data to CSS.
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        dataFrame.write.format("css")
          .option("resource", resource)
          .option("es.nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://Bucket name/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://Bucket name/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode("Overwrite")
          .save()
        

        The options of mode can be one of the following:

        • ErrorIfExis: If the data already exists, the system throws an exception.
        • Overwrite: If the data already exists, the original data will be overwritten.
        • Append: If the data already exists, the system saves the new data.
        • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
      5. Read data from CSS.
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        jdbcDF = sparkSession.read.format("css")\
          .option("resource", resource)\
          .option("es.nodes", nodes)\
          .option("es.net.ssl", "true")\
          .option("es.net.ssl.keystore.location", "obs://Bucket name/path/transport-keystore.jks")\
          .option("es.net.ssl.keystore.pass", "***")\
          .option("es.net.ssl.truststore.location", "obs://Bucket name/path/truststore.jks")\
          .option("es.net.ssl.truststore.pass", "***")\
          .option("es.net.http.auth.user", "admin")\
          .option("es.net.http.auth.pass", "***")\
          .load()
        jdbcDF.show()
        
      6. View the operation result.

    • Connecting to data sources through SQL APIs
      1. Create a table to connect to a CSS data source.
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        sparkSession.sql(
                "create table css_table(id long, name string) using css options(\  
                'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',\
                'es.nodes.wan.only'='true',\
                'resource'='/mytest',\
        	'es.net.ssl'='true',\
        	'es.net.ssl.keystore.location'='obs://Bucket name/path/transport-keystore.jks',\
        	'es.net.ssl.keystore.pass'='***',\
        	'es.net.ssl.truststore.location'='obs://Bucket name/path/truststore.jks',\
        	'es.net.ssl.truststore.pass'='***',\
        	'es.net.http.auth.user'='admin',\
        	'es.net.http.auth.pass'='***')")
        

        For details about the parameters for creating a CSS datasource connection table, see Table 1.

      2. Insert data.
        1
        sparkSession.sql("insert into css_table values(3,'tom')")
        
      3. Query data.
        1
        2
        jdbcDF = sparkSession.sql("select * from css_table")
        jdbcDF.show()
        
      4. View the operation result.

    • Submitting a Spark job
      1. Upload the Python code file to DLI.
      2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.
        • When submitting a job, you need to specify a dependency module named sys.datasource.css.
        • For details about how to submit a job on the DLI console, see
        • For details about how to submit a job through an API, see the modules parameter in
  • Complete example code
    • Connecting to data sources through DataFrame APIs

      Hard-coded or plaintext AK and SK pose significant security risks. To ensure security, encrypt your AK and SK, store them in configuration files or environment variables, and decrypt them when needed.

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.   
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        sparkSession.conf.set("fs.obs.access.key", ak)
        sparkSession.conf.set("fs.obs.secret.key", sk)
        sparkSession.conf.set("fs.obs.endpoint", enpoint)
        sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
      
        # Setting cross-source connection parameters  
        resource = "/mytest";
        nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
      
        # Setting schema  
        schema = StructType([StructField("id", IntegerType(), False),       
                             StructField("name", StringType(), False)])
      
        # Construction data 
        rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
      
        # Create a DataFrame from RDD and schema  
        dataFrame = sparkSession.createDataFrame(rdd, schema)
      
        # Write data to the CSS 
        dataFrame.write.format("css")
          .option("resource", resource)
          .option("es.nodes", nodes)
          .option("es.net.ssl", "true")
          .option("es.net.ssl.keystore.location", "obs://Bucket name/path/transport-keystore.jks")
          .option("es.net.ssl.keystore.pass", "***")
          .option("es.net.ssl.truststore.location", "obs://Bucket name/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")
          .option("es.net.http.auth.user", "admin")
          .option("es.net.http.auth.pass", "***")
          .mode("Overwrite")
          .save()
      
        # Read data  
        jdbcDF = sparkSession.read.format("css")\
          .option("resource", resource)\
          .option("es.nodes", nodes)\
          .option("es.net.ssl", "true")\
          .option("es.net.ssl.keystore.location", "obs://Bucket name/path/transport-keystore.jks")\
          .option("es.net.ssl.keystore.pass", "***")\
          .option("es.net.ssl.truststore.location", "obs://Bucket name/path/truststore.jks")
          .option("es.net.ssl.truststore.pass", "***")\
          .option("es.net.http.auth.user", "admin")\
          .option("es.net.http.auth.pass", "***")\
          .load()
        jdbcDF.show()
      
        # close session  
        sparkSession.stop()
      
    • Connecting to data sources through SQL APIs
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql import SparkSession
      import os               
       
      if __name__ == "__main__":
        
        # Create a SparkSession session.   
        sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
        # Create a DLI data table for DLI-associated CSS   
        sparkSession.sql("create table css_table(id int, name string) using css options(\
                          'es.nodes'='192.168.6.204:9200',\
                          'es.nodes.wan.only'='true',\
                          'resource'='/mytest',\
                          'es.net.ssl'='true',\
                          'es.net.ssl.keystore.location' = 'obs://xietest1/lzq/keystore.jks',\
                          'es.net.ssl.keystore.pass' = '**',\
                          'es.net.ssl.truststore.location'='obs://xietest1/lzq/truststore.jks',\
                          'es.net.ssl.truststore.pass'='**',\
                          'es.net.http.auth.user'='admin',\
                          'es.net.http.auth.pass'='**')")
       
        # Insert data into the DLI data table  
        sparkSession.sql("insert into css_table values(3,'tom')")
       
        # Read data from DLI data table   
        jdbcDF = sparkSession.sql("select * from css_table")   
        jdbcDF.show()
       
        # close session  
        sparkSession.stop()