Java Example Code

Development description

Redis supports only enhanced datasource connections. Only yearly/monthly queues can be used.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages. For details, see Data Lake Insight User Guide.

  • Code implementation
    1. Dependencies related to import
      1
      2
      3
      4
      5
      6
      7
      8
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.sql.*;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      import java.util.*;
      
    2. Create a session
      1
      2
      3
      4
      5
      6
      7
      8
      SparkConf sparkConf = new SparkConf();
      sparkConf.setAppName("datasource-redis")
              .set("spark.redis.host", "192.168.4.199")
              .set("spark.redis.port", "6379")
              .set("spark.redis.auth", "******")
              .set("spark.driver.allowMultipleContexts","true");
      JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
      SQLContext sqlContext = new SQLContext(javaSparkContext);
      
  • Connecting to Datasources Through DataFrame APIs
    1. Reads JSON data as DataFrame.
      1
      2
      3
      4
      JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
              "{\"id\":\"1\",\"name\":\"zhangsan\",\"age\":\"18\"}",
              "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
      Dataset dataFrame = sqlContext.read().json(javaRDD);
      
    2. Construct the Redis connection configuration parameters.
      1
      2
      3
      Map map = new HashMap<String, String>();
      map.put("table","person");
      map.put("key.column","id");
      
    3. Save data to Redis
      1
      dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();
      
    4. Read data from Redis
      1
      sqlContext.read().format("redis").options(map).load().show();
      
    5. Operation result

  • Submitting a Spark job
    1. Upload the Java code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.redis.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Test_Redis_DaraFrame {
  public static void main(String[] args) {
    //create a SparkSession session  
    SparkConf sparkConf = new SparkConf();  
    sparkConf.setAppName("datasource-redis")
             .set("spark.redis.host", "192.168.4.199")
             .set("spark.redis.port", "6379")
             .set("spark.redis.auth", "******")
             .set("spark.driver.allowMultipleContexts","true");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(javaSparkContext);

    //Read RDD in JSON format to create DataFrame
    JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
            "{\"id\":\"1\",\"name\":\"zhangsan\",\"age\":\"18\"}",
            "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
    Dataset dataFrame = sqlContext.read().json(javaRDD);

    Map map = new HashMap<String, String>();
    map.put("table","person");
    map.put("key.column","id");
    dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();      
    sqlContext.read().format("redis").options(map).load().show();

  }
}