Scala Example Code
Development Description
Mongo can be connected only through enhanced datasource connections.
DDS is compatible with the MongoDB protocol.
An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
- Constructing dependency information and creating a Spark session
- Import dependencies.
Maven dependency involved
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
Import dependency packages.import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
Create a session.val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
- Import dependencies.
- Connecting to data sources through SQL APIs
- Create a table to connect to a Mongo data source.
sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')")
Figure 1 Mongo link address
- Insert data.
sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
- Query data.
sparkSession.sql("select * from test_dds").show()
View the operation result.
- Create a table to connect to a Mongo data source.
- Connecting to data sources through DataFrame APIs
- Set connection parameters.
val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val uri = "mongodb://username:pwd@host:8635/db" val user = "rwuser" val database = "test" val collection = "test" val password = "######"
- Construct a schema.
1
val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
- Construct a DataFrame.
val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema)
- Import data to Mongo.
1 2 3 4 5 6 7 8 9
dataFrame.write.format("mongo") .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save()
The options of mode are Overwrite, Append, ErrorIfExis, and Ignore.
- Read data from Mongo.
1 2 3 4 5 6 7 8
val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load()
Operation result
- Set connection parameters.
- Submitting a Spark job
- Generate a JAR package based on the code and upload the package to DLI.
- In the Spark job editor, select the corresponding dependency module and execute the Spark job.
- If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.mongo when you submit a job.
- If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
- For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
- For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.
Complete Example Code
- Maven dependency
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- Connecting to data sources through SQL APIs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import org.apache.spark.sql.SparkSession object TestMongoSql { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().getOrCreate() sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')") sparkSession.sql("insert into test_dds values('3', 'Ann',23)") sparkSession.sql("select * from test_dds").show() sparkSession.close() } }
- Connecting to data sources through DataFrame APIs
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} object Test_Mongo_SparkSql { def main(args: Array[String]): Unit = { // Create a SparkSession session. val spark = SparkSession.builder().appName("mongodbTest").getOrCreate() // Set the connection configuration parameters. val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val uri = "mongodb://username:pwd@host:8635/db" val user = "rwuser" val database = "test" val collection = "test" val password = "######" // Setting up the schema val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType))) // Setting up the DataFrame val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema) // Write data to mongo dataFrame.write.format("mongo") .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save() // Reading data from mongo val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load() jdbcDF.show() spark.close() } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.