Updated on 2024-07-04 GMT+08:00

Java Example Code

Development Description

Redis supports only enhanced datasource connections.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages. For details, see Enhanced Datasource Connections.

    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Code implementation
    1. Import dependencies.
      • Maven dependency involved
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
      • Import dependency packages.
        1
        2
        3
        4
        5
        6
        7
        8
        import org.apache.spark.SparkConf;
        import org.apache.spark.api.java.JavaRDD;
        import org.apache.spark.api.java.JavaSparkContext;
        import org.apache.spark.sql.*;
        import org.apache.spark.sql.types.DataTypes;
        import org.apache.spark.sql.types.StructField;
        import org.apache.spark.sql.types.StructType;
        import java.util.*;
        
    2. Create a session.
      1
      2
      3
      4
      5
      6
      7
      8
      SparkConf sparkConf = new SparkConf();
      sparkConf.setAppName("datasource-redis")
              .set("spark.redis.host", "192.168.4.199")
              .set("spark.redis.port", "6379")
              .set("spark.redis.auth", "******")
              .set("spark.driver.allowMultipleContexts","true");
      JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
      SQLContext sqlContext = new SQLContext(javaSparkContext);
      
  • Connecting to data sources through DataFrame APIs
    1. Read JSON data as DataFrames.
      1
      2
      3
      4
      JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
              "{\"id\":\"1\",\"name\":\"Ann\",\"age\":\"18\"}",
              "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
      Dataset dataFrame = sqlContext.read().json(javaRDD);
      
    2. Construct the Redis connection parameters.
      1
      2
      3
      Map map = new HashMap<String, String>();
      map.put("table","person");
      map.put("key.column","id");
      
    3. Save data to Redis.
      1
      dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();
      
    4. Read data from Redis.
      1
      sqlContext.read().format("redis").options(map).load().show();
      
    5. View the operation result.

  • Submitting a Spark job
    1. Upload the Java code file to DLI.

      For details about console operations, see Creating a Package. For details about API operations, see Uploading a Package Group.

    2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

      For details about console operations, see Creating a Spark Job. For details about API operations, see Creating a Batch Processing Job.
      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.redis when you submit a job.
      • If the Spark version is 3.1.1, you do not need to select the Module module. You need to configure the 'Spark parameter (--conf) '.

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*

      • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
      • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Test_Redis_DaraFrame {
  public static void main(String[] args) {
    //create a SparkSession session  
    SparkConf sparkConf = new SparkConf();  
    sparkConf.setAppName("datasource-redis")
             .set("spark.redis.host", "192.168.4.199")
             .set("spark.redis.port", "6379")
             .set("spark.redis.auth", "******")
             .set("spark.driver.allowMultipleContexts","true");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(javaSparkContext);

    //Read RDD in JSON format to create DataFrame
    JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
            "{\"id\":\"1\",\"name\":\"Ann\",\"age\":\"18\"}",
            "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
    Dataset dataFrame = sqlContext.read().json(javaRDD);

    Map map = new HashMap<String, String>();
    map.put("table","person");
    map.put("key.column","id");
    dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();      
    sqlContext.read().format("redis").options(map).load().show();

  }
}