Scala Example Code
Development description
MongoDB can be connected only through enhanced datasource connection. Only yearly/monthly queues can be used.
- Prerequisites
An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages. For details, see Data Lake Insight User Guide.
- Construct dependency information and create a Spark session.
- Import dependencies Involved Maven dependency
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
Dependencies related to importimport org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}Create a sessionval sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
- Import dependencies
- Connecting to datasources through SQL APIs
- Create a table to connect to Mongo datasource
sparkSession.sql( "create table test_mongo(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')")Figure 1 MongoDB link address
- Insert data
sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)") - Query data
sparkSession.sql("select * from test_mongo").show()Operation result

- Create a table to connect to Mongo datasource
- Connecting to datasources through DataFrame APIs
- Set connection parameters
val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val user = "rwuser" val database = "test" val collection = "test" val password = "######"
- Construct a schema
1
val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
- Construct a DataFrame
val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema) - Importing data to the MongoDB
1 2 3 4 5 6 7 8
dataFrame.write .format("mongo") .option("url", url) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save()
Save type. The options are Overwrite, Append, ErrorIfExis, and Ignore.
- Read data from the MongoDB
1 2 3 4 5 6 7
val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load()
Operation result

- Set connection parameters
- Submitting a Spark job
- Generate a JAR package based on the code and upload the package to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
- In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
- When submitting a job, you need to specify a dependency module named sys.datasource.mongo.
- For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
- For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.
Complete example code
- Maven dependency
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- Connecting to datasources through SQL APIs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import org.apache.spark.sql.SparkSession object TestMongoSql { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().getOrCreate() sparkSession.sql( "create table test_mongo(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')") sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)") sparkSession.sql("select * from test_mongo").show() sparkSession.close() } }
- Connecting to datasources through DataFrame APIs
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} object Test_Mongo_SparkSql { def main(args: Array[String]): Unit = { // Create a SparkSession session. val spark = SparkSession.builder().appName("mongodbTest").getOrCreate() // Set the connection configuration parameters. val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val user = "rwuser" val database = "test" val collection = "test" val password = "######" // Setting up the schema val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType))) // Setting up the DataFrame val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema) // Write data to mongo dataFrame.write .format("mongo") .option("url", url) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save() // Reading data from mongo val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load() jdbcDF.show() spark.close() } }
Last Article: Connecting to MongoDB
Next Article: PySpark Example Code
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.