Deze pagina is nog niet beschikbaar in uw eigen taal. We werken er hard aan om meer taalversies toe te voegen. Bedankt voor uw steun.

Java Example Code

Updated on 2024-06-13 GMT+08:00

Development Description

Redis supports only enhanced datasource connections.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages.

    NOTE:

    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Code implementation
    1. Import dependencies.
      • Maven dependency involved
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
      • Import dependency packages.
        1
        2
        3
        4
        5
        6
        7
        8
        import org.apache.spark.SparkConf;
        import org.apache.spark.api.java.JavaRDD;
        import org.apache.spark.api.java.JavaSparkContext;
        import org.apache.spark.sql.*;
        import org.apache.spark.sql.types.DataTypes;
        import org.apache.spark.sql.types.StructField;
        import org.apache.spark.sql.types.StructType;
        import java.util.*;
        
    2. Create a session.
      1
      2
      3
      4
      5
      6
      7
      8
      SparkConf sparkConf = new SparkConf();
      sparkConf.setAppName("datasource-redis")
              .set("spark.redis.host", "192.168.4.199")
              .set("spark.redis.port", "6379")
              .set("spark.redis.auth", "******")
              .set("spark.driver.allowMultipleContexts","true");
      JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
      SQLContext sqlContext = new SQLContext(javaSparkContext);
      
  • Connecting to data sources through DataFrame APIs
    1. Read JSON data as DataFrames.
      1
      2
      3
      4
      JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
              "{\"id\":\"1\",\"name\":\"Ann\",\"age\":\"18\"}",
              "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
      Dataset dataFrame = sqlContext.read().json(javaRDD);
      
    2. Construct the Redis connection parameters.
      1
      2
      3
      Map map = new HashMap<String, String>();
      map.put("table","person");
      map.put("key.column","id");
      
    3. Save data to Redis.
      1
      dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();
      
    4. Read data from Redis.
      1
      sqlContext.read().format("redis").options(map).load().show();
      
    5. View the operation result.

  • Submitting a Spark job
    1. Upload the Java code file to DLI.

    2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

      NOTE:
      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.redis when you submit a job.
      • If the Spark version is 3.1.1, you do not need to select the Module module. You need to configure the 'Spark parameter (--conf) '.

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*

      • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
      • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Test_Redis_DaraFrame {
  public static void main(String[] args) {
    //create a SparkSession session  
    SparkConf sparkConf = new SparkConf();  
    sparkConf.setAppName("datasource-redis")
             .set("spark.redis.host", "192.168.4.199")
             .set("spark.redis.port", "6379")
             .set("spark.redis.auth", "******")
             .set("spark.driver.allowMultipleContexts","true");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(javaSparkContext);

    //Read RDD in JSON format to create DataFrame
    JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
            "{\"id\":\"1\",\"name\":\"Ann\",\"age\":\"18\"}",
            "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
    Dataset dataFrame = sqlContext.read().json(javaRDD);

    Map map = new HashMap<String, String>();
    map.put("table","person");
    map.put("key.column","id");
    dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();      
    sqlContext.read().format("redis").options(map).load().show();

  }
}
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback