PySpark Example Code
Development description
MongoDB supports only the enhanced datasource connection. Only yearly/monthly queues can be used.
- Prerequisites
An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages. For details, see Data Lake Insight User Guide.
- Connecting to datasources through DataFrame APIs
- Import dependencies.
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
- Create a session.
1
sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
- Configure connection parameters.
1 2 3 4 5
url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" user = "rwuser" database = "test" collection = "test" password = "######"
- Create a DataFrame.
1 2 3 4 5
dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)]) schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) dataFrame = sparkSession.createDataFrame(dataList, schema)
- Import data to MongoDB.
1 2 3 4 5 6 7 8
dataFrame.write.format("mongo") .option("url", url) .option("user",user) .option("password",password) .option("database",database) .option("collection",collection) .mode("Overwrite") .save()
- Read data from Mongo.
jdbcDF = sparkSession.read .format("mongo") .option("url", url) .option("user",user) .option("password",password) .option("database",database) .option("collection",collection) .load() jdbcDF.show() - Operation result:
- Import dependencies.
- Connecting to datasources through SQL APIs
- Create a table to connect to Mongo datasource.
1 2 3 4 5 6
sparkSession.sql( "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.mongo OPTIONS ( 'host' = '192.168.4.199', 'port' = '6379', 'password' = '######', table 'person')".stripMargin)
- Insert data.
1
sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
- Query data.
1
sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
- Create a table to connect to Mongo datasource.
- Submitting a Spark job
- Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
- In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
- When submitting a job, you need to specify a dependency module named sys.datasource.mongo.
- For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
- For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.
Complete example code
- Connecting to datasources through DataFrame APIs
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Setting connection parameters url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" user = "rwuser" database = "test" collection = "test" password = "######" # Write data to the mongodb table dataFrame.write.format("mongo") .option("url", url) .option("user",user) .option("password",password) .option("database",database) .option("collection",collection) .mode("Overwrite").save() # Read data jdbcDF = sparkSession.read.format("mongo") .option("url", url) .option("user",user) .option("password",password) .option("database",database) .option("collection",collection) .load() jdbcDF.show() # close session sparkSession.stop() - Connecting to datasources through SQL APIs
from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Createa data table for DLI - associated mongo sparkSession.sql( "create table test_mongo(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')") # Insert data into the DLI-table sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)") # Read data from DLI-table sparkSession.sql("select * from test_mongo").show() # close session sparkSession.stop()
Last Article: Scala Example Code
Next Article: Java Example Code

Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.