Deze pagina is nog niet beschikbaar in uw eigen taal. We werken er hard aan om meer taalversies toe te voegen. Bedankt voor uw steun.

PySpark Example Code

Updated on 2024-06-13 GMT+08:00

Development Description

Mongo can be connected only through enhanced datasource connections.

NOTE:

DDS is compatible with the MongoDB protocol.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages.

    NOTE:

    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Connecting to data sources through DataFrame APIs
    1. Import dependencies.
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
    2. Create a session.
      1
      sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
      
    3. Set connection parameters.
      1
      2
      3
      4
      5
      6
      url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      uri = "mongodb://username:pwd@host:8635/db"
      user = "rwuser"
      database = "test"
      collection = "test"
      password = "######"
      
      NOTE:

      For details about the parameters, see Table 1.

    4. Create a DataFrame.
      1
      2
      3
      4
      5
      dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)])
      schema = StructType([StructField("id", IntegerType(), False),          
                           StructField("name", StringType(), False),
                           StructField("age", IntegerType(), False)])
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    5. Import data to Mongo.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      dataFrame.write.format("mongo")
        .option("url", url)\
        .option("uri", uri)\
        .option("user",user)\
        .option("password",password)\
        .option("database",database)\
        .option("collection",collection)\
        .mode("Overwrite")\
        .save()
      
    6. Read data from Mongo.
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      jdbcDF = sparkSession.read
        .format("mongo")\
        .option("url", url)\
        .option("uri", uri)\
        .option("user",user)\
        .option("password",password)\
        .option("database",database)\
        .option("collection",collection)\
        .load()
      jdbcDF.show()
      
    7. View the operation result.

  • Connecting to data sources through SQL APIs
    1. Create a table to connect to a Mongo data source.
      sparkSession.sql(
            "create table test_dds(id string, name string, age int) using mongo options(
            'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
            'uri' = 'mongodb://username:pwd@host:8635/db',
            'database' = 'test',
            'collection' = 'test', 
            'user' = 'rwuser', 
            'password' = '######')")
      NOTE:

      For details about the parameters, see Table 1.

    2. Insert data.
      1
      sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
      
    3. Query data.
      1
      sparkSession.sql("select * from test_dds").show()
      
  • Submitting a Spark job
    1. Upload the Python code file to DLI.

    2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

      NOTE:
      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.mongo when you submit a job.
      • If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*

      • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
      • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

  • Connecting to data sources through DataFrame APIs
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.
      sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
    
      # Create a DataFrame and initialize the DataFrame data.  
      dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)])
      
      # Setting schema  
      schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False), StructField("age", IntegerType(), False)])
    
      # Create a DataFrame from RDD and schema  
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Setting connection parameters  
      url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      uri = "mongodb://username:pwd@host:8635/db"
      user = "rwuser"
      database = "test"
      collection = "test"
      password = "######"
     
      # Write data to the mongodb table  
      dataFrame.write.format("mongo")
        .option("url", url)\
        .option("uri", uri)\
        .option("user",user)\
        .option("password",password)\
        .option("database",database)\
        .option("collection",collection)
        .mode("Overwrite").save()
    
      # Read data  
      jdbcDF = sparkSession.read.format("mongo")
        .option("url", url)\
        .option("uri", uri)\
        .option("user",user)\
        .option("password",password)\
        .option("database",database)\
        .option("collection",collection)\
        .load()   
      jdbcDF.show()
     
      # close session  
      sparkSession.stop()
  • Connecting to data sources through SQL APIs
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.  
      sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
    
      # Create a data table for DLI - associated mongo
        sparkSession.sql(
          "create table test_dds(id string, name string, age int) using mongo options(\
          'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',\
          'uri' = 'mongodb://username:pwd@host:8635/db',\
          'database' = 'test',\
          'collection' = 'test', \
          'user' = 'rwuser', \
          'password' = '######')")
    
      # Insert data into the DLI-table  
      sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
    
      # Read data from DLI-table  
      sparkSession.sql("select * from test_dds").show()
    
      # close session  
      sparkSession.stop()
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback