Java Example Code
Development Description
Mongo can be connected only through enhanced datasource connections.
DDS is compatible with the MongoDB protocol.
- Prerequisites
An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages. For details, see Enhanced Datasource Connections.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
- Code implementation
- Import dependencies.
- Maven dependency involved
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- Import dependency packages.
import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode;
- Maven dependency involved
- Create a session.
1 2 3
SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("datasource-mongo")); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext); SQLContext sqlContext = new SQLContext(javaSparkContext);
- Import dependencies.
- Connecting to data sources through DataFrame APIs
- Read JSON data as DataFrames.
JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList("{\"id\":\"5\",\"name\":\"Ann\",\"age\":\"23\"}")); Dataset<Row> dataFrame = sqlContext.read().json(javaRDD);
- Set connection parameters.
String url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"; String uri = "mongodb://username:pwd@host:8635/db"; String user = "rwuser"; String database = "test"; String collection = "test"; String password = "######";
For details about the parameters, see Table 1.
- Import data to Mongo.
dataFrame.write().format("mongo") .option("url",url) .option("uri",uri) .option("database",database) .option("collection",collection) .option("user",user) .option("password",password) .mode(SaveMode.Overwrite) .save();
- Read data from Mongo.
1 2 3 4 5 6 7 8
sqlContext.read().format("mongo") .option("url",url) .option("uri",uri) .option("database",database) .option("collection",collection) .option("user",user) .option("password",password) .load().show();
- View the operation result.
- Read JSON data as DataFrames.
- Submitting a Spark job
- Upload the Java code file to DLI.
For details about console operations, see Creating a Package. For details about API operations, see Uploading a Package Group.
- In the Spark job editor, select the corresponding dependency module and execute the Spark job.
For details about console operations, see Creating a Spark Job. For details about API operations, see Creating a Batch Processing Job.
- If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.mongo when you submit a job.
- If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
- For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
- For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.
- Upload the Java code file to DLI.
Complete Example Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; import java.util.Arrays; public class TestMongoSparkSql { public static void main(String[] args) { SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("datasource-mongo")); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext); SQLContext sqlContext = new SQLContext(javaSparkContext); // // Read json file as DataFrame, read csv / parquet file, same as json file distribution // DataFrame dataFrame = sqlContext.read().format("json").load("filepath"); // Read RDD in JSON format to create DataFrame JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList("{\"id\":\"5\",\"name\":\"Ann\",\"age\":\"23\"}")); Dataset<Row> dataFrame = sqlContext.read().json(javaRDD); String url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"; String uri = "mongodb://username:pwd@host:8635/db"; String user = "rwuser"; String database = "test"; String collection = "test"; String password = "######"; dataFrame.write().format("mongo") .option("url",url) .option("uri",uri) .option("database",database) .option("collection",collection) .option("user",user) .option("password",password) .mode(SaveMode.Overwrite) .save(); sqlContext.read().format("mongo") .option("url",url) .option("uri",uri) .option("database",database) .option("collection",collection) .option("user",user) .option("password",password) .load().show(); sparkContext.stop(); javaSparkContext.close(); } } |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot