Scala Example Code

Development description

MongoDB can be connected only through enhanced datasource connection. Only yearly/monthly queues can be used.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages. For details, see Data Lake Insight User Guide.

  • Construct dependency information and create a Spark session.
    1. Import dependencies
      Involved Maven dependency
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
      Dependencies related to import
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
      Create a session
      val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
  • Connecting to datasources through SQL APIs
    1. Create a table to connect to Mongo datasource
      sparkSession.sql(
        "create table test_mongo(id string, name string, age int) using mongo options(
          'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
          'database' = 'test',
          'collection' = 'test',
          'user' = 'rwuser',
          'password' = '######')")
      Table 1 Parameters for creating a table

      Parameter

      Description

      url

      • URL format:

        "IP:PORT[,IP:PORT]/[DATABASE][.COLLECTION][AUTH_PROPERTIES]"

        Example:

        "192.168.4.62:8635/test?authSource=admin"

      • The URL needs to be obtained from the mongo (DDS) connection address..
      NOTE:

      The obtained MongoDB connection address is in the following format: Protocol header://Username:Password@Connection address:port number/Database name?authSource=admin

      Example:

      mongodb://rwuser:****@192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin

      database

      DDS database name. If the database name is specified in the URL, the database name in the URL does not take effect.

      collection

      Collection name in the DDS. If the collection is specified in the URL, the collection in the URL does not take effect.

      NOTE:

      If a collection already exists in DDS, you do not need to specify schema information when creating a table. DLI automatically generates schema information based on data in the collection.

      user

      Username for accessing the DDS cluster.

      password

      Password for accessing the DDS cluster.

      Figure 1 MongoDB link address
    2. Insert data
      sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)")
    3. Query data
      sparkSession.sql("select * from test_mongo").show()

      Operation result

  • Connecting to datasources through DataFrame APIs
    1. Set connection parameters
      val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      val user = "rwuser"
      val database = "test"
      val collection = "test"
      val password = "######"
    2. Construct a schema
      1
      val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
      
    3. Construct a DataFrame
      val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32)))
      val dataFrame = spark.createDataFrame(rdd, schema)
    4. Importing data to the MongoDB
      1
      2
      3
      4
      5
      6
      7
      8
      dataFrame.write .format("mongo")
        .option("url", url)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .mode(SaveMode.Overwrite)
        .save()
      

      Save type. The options are Overwrite, Append, ErrorIfExis, and Ignore.

    5. Read data from the MongoDB
      1
      2
      3
      4
      5
      6
      7
      val jdbcDF = spark.read.format("mongo").schema(schema)
        .option("url", url)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .load()
      

      Operation result

  • Submitting a Spark job
    1. Generate a JAR package based on the code and upload the package to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.mongo.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Maven dependency
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
  • Connecting to datasources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    import org.apache.spark.sql.SparkSession
    
    object TestMongoSql {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder().getOrCreate()
        sparkSession.sql(
          "create table test_mongo(id string, name string, age int) using mongo options(
            'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
            'database' = 'test',
            'collection' = 'test',
            'user' = 'rwuser',
            'password' = '######')")
        sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)")
        sparkSession.sql("select * from test_mongo").show()
        sparkSession.close()
      }
    }
    
  • Connecting to datasources through DataFrame APIs
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    object Test_Mongo_SparkSql {
      def main(args: Array[String]): Unit = {
      //  Create a SparkSession session.  
      val spark = SparkSession.builder().appName("mongodbTest").getOrCreate()
    
      // Set the connection configuration parameters.  
      val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      val user = "rwuser"
      val database = "test"
      val collection = "test"
      val password = "######"
    
      // Setting up the schema
      val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
    
      // Setting up the DataFrame
      val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32)))
      val dataFrame = spark.createDataFrame(rdd, schema)
    
    
      // Write data to mongo
      dataFrame.write .format("mongo")
        .option("url", url)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .mode(SaveMode.Overwrite)
        .save()
    
      // Reading data from mongo
      val jdbcDF = spark.read.format("mongo").schema(schema)
        .option("url", url)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .load()
      jdbcDF.show()
    
      spark.close()
     }
    }