PySpark Example Code

Development description

MongoDB supports only the enhanced datasource connection. Only yearly/monthly queues can be used.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages. For details, see Data Lake Insight User Guide.

  • Connecting to datasources through DataFrame APIs
    1. Import dependencies.
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
    2. Create a session.
      1
      sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
      
    3. Configure connection parameters.
      1
      2
      3
      4
      5
      url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      user = "rwuser"
      database = "test"
      collection = "test"
      password = "######"
      
    4. Create a DataFrame.
      1
      2
      3
      4
      5
      dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)])
      schema = StructType([StructField("id", IntegerType(), False),          
                           StructField("name", StringType(), False),
                           StructField("age", IntegerType(), False)])
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    5. Import data to MongoDB.
      1
      2
      3
      4
      5
      6
      7
      8
      dataFrame.write.format("mongo")
        .option("url", url)
        .option("user",user)
        .option("password",password)
        .option("database",database)
        .option("collection",collection)
        .mode("Overwrite")
        .save()
      
    6. Read data from Mongo.
      jdbcDF = sparkSession.read
        .format("mongo")
        .option("url", url)
        .option("user",user)
        .option("password",password)
        .option("database",database)
        .option("collection",collection)
        .load()
      jdbcDF.show()
    7. Operation result:

  • Connecting to datasources through SQL APIs
    1. Create a table to connect to Mongo datasource.
      1
      2
      3
      4
      5
      6
      sparkSession.sql(
        "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.mongo OPTIONS (
           'host' = '192.168.4.199',
           'port' = '6379',
           'password' = '######',
           table  'person')".stripMargin)
      
    2. Insert data.
      1
      sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
      
    3. Query data.
      1
      sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
      
  • Submitting a Spark job
    1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.mongo.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Connecting to datasources through DataFrame APIs
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.
      sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
    
      # Create a DataFrame and initialize the DataFrame data.  
      dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)])
      
      # Setting schema  
      schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False), StructField("age", IntegerType(), False)])
    
      # Create a DataFrame from RDD and schema  
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Setting connection parameters  
      url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      user = "rwuser"
      database = "test"
      collection = "test"
      password = "######"
     
      # Write data to the mongodb table  
      dataFrame.write.format("mongo")
        .option("url", url)
        .option("user",user)
        .option("password",password)
        .option("database",database)
        .option("collection",collection)
        .mode("Overwrite").save()
    
      # Read data  
      jdbcDF = sparkSession.read.format("mongo")
        .option("url", url)
        .option("user",user)
        .option("password",password)
        .option("database",database)
        .option("collection",collection)
        .load()   
      jdbcDF.show()
     
      # close session  
      sparkSession.stop()
  • Connecting to datasources through SQL APIs
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.  
      sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
    
      # Createa data table for DLI - associated mongo
      sparkSession.sql(
        "create table test_mongo(id string, name string, age int) using mongo options(
          'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
          'database' = 'test',
          'collection' = 'test', 
          'user' = 'rwuser', 
          'password' = '######')")
    
      # Insert data into the DLI-table  
      sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)")
    
      # Read data from DLI-table  
      sparkSession.sql("select * from test_mongo").show()
    
      # close session  
      sparkSession.stop()