Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with Mongo (By Scala)> Complete Example Code

Complete Example Code

Updated at: Mar 17, 2020 GMT+08:00

Maven Dependency

1
2
3
4
5
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.3.2</version>
</dependency>

Connecting to Datasources Through SQL APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import org.apache.spark.sql.SparkSession

object TestMongoSql {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().getOrCreate()
    sparkSession.sql(
      "create table test_mongo(id string, name string, age int) using mongo options(
        'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
        'database' = 'test',
        'collection' = 'test',
        'user' = 'rwuser',
        'password' = '######')")
    sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)")
    sparkSession.sql("select * from test_mongo").show()
    sparkSession.close()
  }
}

Connecting to Datasources Through DataFrame APIs

import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object Test_Mongo_SparkSql {
  def main(args: Array[String]): Unit = {
  //  Create a SparkSession session.  
  val spark = SparkSession.builder().appName("mongodbTest").getOrCreate()

  // Set the connection configuration parameters.  
  val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
  val user = "rwuser"
  val database = "test"
  val collection = "test"
  val password = "######"

  // Setting up the schema
  val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))

  // Setting up the DataFrame
  val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32)))
  val dataFrame = spark.createDataFrame(rdd, schema)


  // Write data to mongo
  dataFrame.write .format("mongo")
    .option("url", url)
    .option("database", database)
    .option("collection", collection)
    .option("user", user)
    .option("password", password)
    .mode(SaveMode.Overwrite)
    .save()

  // Reading data from mongo
  val jdbcDF = spark.read.format("mongo").schema(schema)
    .option("url", url)
    .option("database", database)
    .option("collection", collection)
    .option("user", user)
    .option("password", password)
    .load()
  jdbcDF.show()

  spark.close()
 }
}

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel