Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with Mongo (By PySpark)> Detailed Development Description

Detailed Development Description

Updated at: Mar 17, 2020 GMT+08:00

Mongo supports only the enhanced datasource connection. Only queues in yearly/monthly packages can be used.

Prerequisites

An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages.

Connecting to Datasources Through DataFrame APIs

  1. Import dependencies
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
  2. Create a session
    1
    sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
    
  3. Set connection parameters
    1
    2
    3
    4
    5
    url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
    user = "rwuser"
    database = "test"
    collection = "test"
    password = "######"
    
  4. Create a DataFrame
    1
    2
    3
    4
    5
    dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)])
    schema = StructType([StructField("id", IntegerType(), False),          
                         StructField("name", StringType(), False),
                         StructField("age", IntegerType(), False)])
    dataFrame = sparkSession.createDataFrame(dataList, schema)
    
  5. Importing data to MongoDB
    1
    2
    3
    4
    5
    6
    7
    8
    dataFrame.write.format("mongo")
      .option("url", url)
      .option("user",user)
      .option("password",password)
      .option("database",database)
      .option("collection",collection)
      .mode("Overwrite")
      .save()
    
  6. Read data from Redis
    jdbcDF = sparkSession.read
      .format("mongo")
      .option("url", url)
      .option("user",user)
      .option("password",password)
      .option("database",database)
      .option("collection",collection)
      .load()
    jdbcDF.show()
  7. Operation result

Connecting to Datasources Through SQL APIs

PySpark operates Redis data using SQL APIs. For details, see Scala Development Description > Development Description > Accessing Data Using SQL APIs or view the complete code.

Submitting a Spark Job

  1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
  2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Session (Recommended) and Creating a Batch Processing Job in the Data Lake Insight API Reference.
    • When submitting a job, you need to specify a dependency module named sys.datasource.mongo.
    • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
    • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Session and Creating a Batch Processing Job in the Data Lake Insight API Reference.

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel