Scala Example Code
Development Description
Mongo can be connected only through enhanced datasource connections.
DDS is compatible with the MongoDB protocol.
An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages. For details, see Enhanced Datasource Connections.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
- Constructing dependency information and creating a Spark session
- Import dependencies.
Maven dependency involved
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
Import dependency packages.import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
Create a session.val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
- Import dependencies.
- Connecting to data sources through SQL APIs
- Create a table to connect to a Mongo data source.
sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')")
Table 1 Parameters for creating a table Parameter
Description
url
- URL format:
"IP:PORT[,IP:PORT]/[DATABASE][.COLLECTION][AUTH_PROPERTIES]"
Example:
"192.168.4.62:8635/test?authSource=admin"
- The URL needs to be obtained from the Mongo (DDS) connection address..
The obtained Mongo connection address is in the following format: Protocol header://Username:Password@Connection address:Port number/Database name?authSource=admin
Example:
mongodb://rwuser:****@192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin
uri
URI format: mongodb://username:pwd@host:8635/db
Set the following parameters to the actual values:
- username: username used for creating the Mongo (DDS) database
- pwd: password of the username for the Mongo (DDS) database
- host: IP address of the Mongo (DDS) database instance
- db: name of the created Mongo (DDS) database
For details about how to create a Mongo (DDS) database user, see Creating a Database Account Using Commands.
database
DDS database name. If the database name is specified in the URL, the database name in the URL does not take effect.
collection
Collection name in the DDS. If the collection is specified in the URL, the collection in the URL does not take effect.
NOTE:If a collection already exists in DDS, you do not need to specify schema information when creating a table. DLI automatically generates schema information based on data in the collection.
user
Username for accessing the DDS cluster.
password
Password for accessing the DDS cluster.
Figure 1 Mongo link address
- URL format:
- Insert data.
sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
- Query data.
sparkSession.sql("select * from test_dds").show()
View the operation result.
- Create a table to connect to a Mongo data source.
- Connecting to data sources through DataFrame APIs
- Set connection parameters.
val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val uri = "mongodb://username:pwd@host:8635/db" val user = "rwuser" val database = "test" val collection = "test" val password = "######"
- Construct a schema.
1
val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
- Construct a DataFrame.
val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema)
- Import data to Mongo.
1 2 3 4 5 6 7 8 9
dataFrame.write.format("mongo") .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save()
The options of mode are Overwrite, Append, ErrorIfExis, and Ignore.
- Read data from Mongo.
1 2 3 4 5 6 7 8
val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load()
Operation result
- Set connection parameters.
- Submitting a Spark job
- Generate a JAR package based on the code and upload the package to DLI.
For details about console operations, see Creating a Package. For details about API operations, see Uploading a Package Group.
- In the Spark job editor, select the corresponding dependency module and execute the Spark job.
For details about console operations, see Creating a Spark Job. For details about API operations, see Creating a Batch Processing Job.
- If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.mongo when you submit a job.
- If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
- For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
- For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.
- Generate a JAR package based on the code and upload the package to DLI.
Complete Example Code
- Maven dependency
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- Connecting to data sources through SQL APIs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import org.apache.spark.sql.SparkSession object TestMongoSql { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().getOrCreate() sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')") sparkSession.sql("insert into test_dds values('3', 'Ann',23)") sparkSession.sql("select * from test_dds").show() sparkSession.close() } }
- Connecting to data sources through DataFrame APIs
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} object Test_Mongo_SparkSql { def main(args: Array[String]): Unit = { // Create a SparkSession session. val spark = SparkSession.builder().appName("mongodbTest").getOrCreate() // Set the connection configuration parameters. val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val uri = "mongodb://username:pwd@host:8635/db" val user = "rwuser" val database = "test" val collection = "test" val password = "######" // Setting up the schema val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType))) // Setting up the DataFrame val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema) // Write data to mongo dataFrame.write.format("mongo") .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save() // Reading data from mongo val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load() jdbcDF.show() spark.close() } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot