Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with CSS Security Clusters (By PySpark)> Detailed Development Description

Detailed Development Description

Updated at: Mar 17, 2020 GMT+08:00

Prerequisites

A datasource connection has been created on the DLI management console.

Code Implementation

  1. Dependencies related to import
    1
    2
    3
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    
  2. Create a session and set the access keys.
    1
    2
    3
    4
    5
    sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
    sparkSession.conf.set("fs.s3a.access.key", ak)
    sparkSession.conf.set("fs.s3a.secret.key", sk)
    sparkSession.conf.set("fs.s3a.endpoint", enpoint)
    sparkSession.conf.set("fs.s3a.connecton.ssl.enabled", "false")
    

Connecting to Datasources Through DataFrame APIs

  1. Configure datasource connection
    1
    2
    resource = "/mytest/css";
    nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
    
  2. Create a schema and add data to the schema
    1
    2
    3
    schema = StructType([StructField("id", IntegerType(), False),                  
                         StructField("name", StringType(), False)])
    rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
    
  3. Construct a DataFrame
    1
    dataFrame = sparkSession.createDataFrame(rdd, schema)
    
  4. Save the data to CSS
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    dataFrame.write.format("css")
      .option("resource", resource)
      .option("nodes", nodes)
      .option("es.net.ssl", "true")
      .option("es.net.ssl.keystore.location", "s3a://AK:SK@bucket name/path/transport-keystore.jks")
      .option("es.net.ssl.keystore.pass", "***")
      .option("es.net.ssl.truststore.location", "s3a://AK:SK@bucket name/path/truststore.jks")
      .option("es.net.ssl.truststore.pass", "***")
      .option("es.net.http.auth.user", "admin")
      .option("es.net.http.auth.pass", "***")
      .mode("Overwrite")
      .save()
    

    The value of mode can be one of the following:

    • ErrorIfExis: If the data already exists, the system throws an exception.
    • Overwrite: If the data already exists, the original data will be overwritten.
    • Append: If the data already exists, the system saves the new data.
    • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
  5. Read data from CSS
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    jdbcDF = sparkSession.read.format("css")
      .option("resource", resource)
      .option("nodes", nodes)
      .option("es.net.ssl", "true")
      .option("es.net.ssl.keystore.location", "s3a://AK:SK@bucket name/path/transport-keystore.jks")
      .option("es.net.ssl.keystore.pass", "***")
      .option("es.net.ssl.truststore.location", "s3a://AK:SK@bucket name/path/truststore.jks")
      .option("es.net.ssl.truststore.pass", "***")
      .option("es.net.http.auth.user", "admin")
      .option("es.net.http.auth.pass", "***")
      .load()
    jdbcDF.show()
    
  6. Operation result

Connecting to Datasources Through SQL APIs

  1. Create a table to connect to CSS datasource
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    sparkSession.sql(
      "create table css_table(id long, name string) using css options(  
            'nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',
            'nodes.wan.only'='true',
            'resource'='/mytest/css',
    	'es.net.ssl'='true',
           'es.net.ssl.keystore.location'='s3a://AK:SK@bucket name/path/transport-keystore.jks',
    	'es.net.ssl.keystore.pass'='***',
           'es.net.ssl.truststore.location'='s3a://AK:SK@bucket name/path/truststore.jks',
    	'es.net.ssl.truststore.pass'='***',
    	'es.net.http.auth.user'='admin',
    	'es.net.http.auth.pass'='***')")
    

    For details about the parameters for creating a CSS datasource connection table, see Table 1.

  2. Insert data
    1
    sparkSession.sql("insert into css_table values(3,'tom')")
    
  3. Query data
    1
    2
    jdbcDF = sparkSession.sql("select * from css_table")
    jdbcDF.show()
    
  4. Operation result

Submitting a Spark Job

  1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
  2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Session (Recommended) and Creating a Batch Processing Job in the Data Lake Insight API Reference.
    • When submitting a job, you need to specify a dependency module named sys.datasource.css.
    • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
    • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Session and Creating a Batch Processing Job in the Data Lake Insight API Reference.

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel