Java Example Code

Development description

MongoDB supports only the enhanced datasource connection. Only yearly/monthly queues can be used.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages. For details, see Data Lake Insight User Guide.

  • Code implementation
    1. Dependencies related to import
      import org.apache.spark.SparkConf;
      import org.apache.spark.SparkContext;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SQLContext;
      import org.apache.spark.sql.SaveMode;
    2. Create a session
      1
      2
      3
      SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("datasource-mongo"));
      JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
      SQLContext sqlContext = new SQLContext(javaSparkContext);
      
  • Connecting to datasources through DataFrame APIs
    1. Reads JSON data as DataFrame.
      JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList("{\"id\":\"5\",\"name\":\"zhangsan\",\"age\":\"23\"}"));
      Dataset<Row> dataFrame = sqlContext.read().json(javaRDD);
    2. Set connection parameters
      String url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin";
      String user = "rwuser";
      String database = "test";
      String collection = "test";
      String password = "######";
    3. Importing data to the MongoDB
      dataFrame.write().format("mongo")   
           .option("url",url)   
           .option("database",database)  
           .option("collection",collection)   
           .option("user",user)    
           .option("password",password)  
           .mode(SaveMode.Overwrite)  
           .save();
    4. Read data from HBase
      1
      2
      3
      4
      5
      6
      7
      sqlContext.read().format("mongo")  
          .option("url",url)    
          .option("database",database)   
          .option("collection",collection)  
          .option("user",user)   
          .option("password",password)  
          .load().show();
      
    5. Operation result

  • Submitting a Spark job
    1. Upload the Java code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.mongo.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;

public class TestMongoSparkSql {
  public static void main(String[] args) {
    SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("datasource-mongo"));
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
    SQLContext sqlContext = new SQLContext(javaSparkContext);

//    // Read json file as DataFrame, read csv / parquet file, same as json file distribution
//    DataFrame dataFrame = sqlContext.read().format("json").load("filepath");

    // Read RDD in JSON format to create DataFrame
    JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList("{\"id\":\"5\",\"name\":\"zhangsan\",\"age\":\"23\"}"));
    Dataset<Row> dataFrame = sqlContext.read().json(javaRDD);

    String url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin";
    String user = "rwuser";
    String database = "test";
    String collection = "test";
    String password = "######";

    dataFrame.write().format("mongo")
            .option("url",url)
            .option("database",database) 
           .option("collection",collection)
            .option("user",user)
            .option("password",password)
            .mode(SaveMode.Overwrite) 
            .save();

    sqlContext.read().format("mongo")
            .option("url",url)
            .option("database",database)
            .option("collection",collection)
            .option("user",user)
            .option("password",password)
            .load().show();
    sparkContext.stop();
    javaSparkContext.close();
  }
}