Updated on 2024-07-04 GMT+08:00

Scala Example Code

Development Description

Mongo can be connected only through enhanced datasource connections.

DDS is compatible with the MongoDB protocol.

An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages. For details, see Enhanced Datasource Connections.

Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Constructing dependency information and creating a Spark session
    1. Import dependencies.
      Maven dependency involved
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
      Import dependency packages.
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
      Create a session.
      val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
  • Connecting to data sources through SQL APIs
    1. Create a table to connect to a Mongo data source.
      sparkSession.sql(
        "create table test_dds(id string, name string, age int) using mongo options(
          'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
          'uri' = 'mongodb://username:pwd@host:8635/db',
          'database' = 'test',
          'collection' = 'test',
          'user' = 'rwuser',
          'password' = '######')")
      Table 1 Parameters for creating a table

      Parameter

      Description

      url

      • URL format:

        "IP:PORT[,IP:PORT]/[DATABASE][.COLLECTION][AUTH_PROPERTIES]"

        Example:

        "192.168.4.62:8635/test?authSource=admin"
      • The URL needs to be obtained from the Mongo (DDS) connection address..

        The obtained Mongo connection address is in the following format: Protocol header://Username:Password@Connection address:Port number/Database name?authSource=admin

        Example:

        mongodb://rwuser:****@192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin

      uri

      URI format: mongodb://username:pwd@host:8635/db

      Set the following parameters to the actual values:

      • username: username used for creating the Mongo (DDS) database
      • pwd: password of the username for the Mongo (DDS) database
      • host: IP address of the Mongo (DDS) database instance
      • db: name of the created Mongo (DDS) database

      For details about how to create a Mongo (DDS) database user, see Creating a Database Account Using Commands.

      database

      DDS database name. If the database name is specified in the URL, the database name in the URL does not take effect.

      collection

      Collection name in the DDS. If the collection is specified in the URL, the collection in the URL does not take effect.

      NOTE:

      If a collection already exists in DDS, you do not need to specify schema information when creating a table. DLI automatically generates schema information based on data in the collection.

      user

      Username for accessing the DDS cluster.

      password

      Password for accessing the DDS cluster.

      Figure 1 Mongo link address
    2. Insert data.
      sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
    3. Query data.
      sparkSession.sql("select * from test_dds").show()

      View the operation result.

  • Connecting to data sources through DataFrame APIs
    1. Set connection parameters.
      val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      val uri = "mongodb://username:pwd@host:8635/db"
      val user = "rwuser"
      val database = "test"
      val collection = "test"
      val password = "######"
    2. Construct a schema.
      1
      val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
      
    3. Construct a DataFrame.
      val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32)))
      val dataFrame = spark.createDataFrame(rdd, schema)
    4. Import data to Mongo.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      dataFrame.write.format("mongo")
        .option("url", url)
        .option("uri", uri)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .mode(SaveMode.Overwrite)
        .save()
      

      The options of mode are Overwrite, Append, ErrorIfExis, and Ignore.

    5. Read data from Mongo.
      1
      2
      3
      4
      5
      6
      7
      8
      val jdbcDF = spark.read.format("mongo").schema(schema)
        .option("url", url)
        .option("uri", uri)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .load()
      

      Operation result

  • Submitting a Spark job
    1. Generate a JAR package based on the code and upload the package to DLI.

      For details about console operations, see Creating a Package. For details about API operations, see Uploading a Package Group.

    2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

      For details about console operations, see Creating a Spark Job. For details about API operations, see Creating a Batch Processing Job.
      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.mongo when you submit a job.
      • If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*

      • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
      • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

  • Maven dependency
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
  • Connecting to data sources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.apache.spark.sql.SparkSession
    
    object TestMongoSql {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder().getOrCreate()
        sparkSession.sql(
          "create table test_dds(id string, name string, age int) using mongo options(
            'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
            'uri' = 'mongodb://username:pwd@host:8635/db',
            'database' = 'test',
            'collection' = 'test',
            'user' = 'rwuser',
            'password' = '######')")
        sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
        sparkSession.sql("select * from test_dds").show()
        sparkSession.close()
      }
    }
    
  • Connecting to data sources through DataFrame APIs
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    object Test_Mongo_SparkSql {
      def main(args: Array[String]): Unit = {
      //  Create a SparkSession session.  
      val spark = SparkSession.builder().appName("mongodbTest").getOrCreate()
    
      // Set the connection configuration parameters.  
      val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      val uri = "mongodb://username:pwd@host:8635/db"
      val user = "rwuser"
      val database = "test"
      val collection = "test"
      val password = "######"
    
      // Setting up the schema
      val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
    
      // Setting up the DataFrame
      val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32)))
      val dataFrame = spark.createDataFrame(rdd, schema)
    
    
      // Write data to mongo
      dataFrame.write.format("mongo")
        .option("url", url)
        .option("uri", uri)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .mode(SaveMode.Overwrite)
        .save()
    
      // Reading data from mongo
      val jdbcDF = spark.read.format("mongo").schema(schema)
        .option("url", url)
        .option("uri", uri)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .load()
      jdbcDF.show()
    
      spark.close()
     }
    }