Scala Example Code

Development description

Redis supports only enhanced datasource connections. Only yearly/monthly queues can be used.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages. For details, see Data Lake Insight User Guide.

  • Construct dependency information and create a Spark session.
    1. Import dependencies
      Involved Maven dependency
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.1.0</version>
      </dependency>
      <dependency>
        <groupId>com.redislabs</groupId>
        <artifactId>spark-redis</artifactId>
        <version>2.4.0</version>
      </dependency>
      
      Dependencies related to import
      1
      2
      3
      4
      5
      import org.apache.spark.sql.{Row, SaveMode, SparkSession}
      import org.apache.spark.sql.types._
      import com.redislabs.provider.redis._
      import scala.reflect.runtime.universe._
      import org.apache.spark.{SparkConf, SparkContext}
      
  • Connecting to Datasources Through DataFrame APIs
    1. Create a session
      1
      val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()
      
    2. Construct a schema
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      //method one
      var schema = StructType(Seq(StructField("name", StringType, false), StructField("age", IntegerType, false)))
      var rdd = sparkSession.sparkContext.parallelize(Seq(Row("abc",34),Row("Bob",19)))
      var dataFrame = sparkSession.createDataFrame(rdd, schema)
      // //method two
      // var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23)))
      // val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2", "age")
      // //method three 
      // case class Person(name: String, age: Int)
      // val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45)))
      

      case class Person(name: String, age: Int) must be written outside the object. For details, see Connecting to datasources through DataFrame APIs.

    3. Import data to Redis
      1
      2
      3
      4
      5
      6
      7
      8
      9
      dataFrame .write
        .format("redis")
        .option("host","192.168.4.199")
        .option("port","6379")
        .option("table","person")
        .option("password","******")
        .option("key.column","name")
        .mode(SaveMode.Overwrite)
        .save()
      
      Table 1 Redis operation parameters

      Parameter

      Description

      host

      IP address of the Redis cluster to be connected.

      To obtain the IP address, log in to the HUAWEI CLOUD official website, search for redis, go to the Distributed Cache Service page, and click Cache Management. Select an IP address (including the port information) based on the IP address required by the host name to copy the data. For details, see Figure 1.

      port

      Access port.

      password

      Specifies the password for the connection. This parameter is optional if no password is required.

      table

      The key or hash key in Redis.

      • This parameter is mandatory when Redis data is inserted.
      • Either this parameter or the keys.pattern parameter when Redis data is queried.

      keys.pattern

      Use a regular expression to match multiple keys or hash keys. This parameter is used only for query. Either this parameter or table is used to query Redis data.

      key.column

      Specifies the key value of a column. This parameter is optional. If a key is specified when data is written, the key must be specified during query. Otherwise, the key will be abnormally loaded during query.

      partitions.number

      Number of concurrent tasks during data reading.

      scan.count

      Number of data records read in each batch. The default value is 100. If the CPU usage of the Redis cluster still needs to be improved during data reading, increase the value of this parameter.

      iterator.grouping.size

      Number of data records inserted in each batch. The default value is 100. If the CPU usage of the Redis cluster still needs to be improved during the insertion, increase the value of this parameter.

      timeout

      Timeout interval for connecting to the Redis, in milliseconds. The default value is 2000 (2 seconds).

      • Save type. The options are Overwrite, Append, ErrorIfExis, and Ignore.
      • To save nested DataFrames, use .option("model", "binary").
      • Specify the data expiration time by .option("ttl", 1000). The unit is second.
      Figure 1 Obtaining the IP address and port number of Redis
    4. Read data from Redis
      1
      2
      3
      4
      5
      6
      7
      8
      9
      sparkSession.read
        .format("redis")
        .option("host","192.168.4.199")
        .option("port","6379")
        .option("table", "person")
        .option("password","######")
        .option("key.column","name")
        .load()
        .show()
      

      Operation result:

  • Connecting to Datasources Using Spark RDD
    1. Create a datasource connection.
      1
      2
      3
      4
      5
      6
      val sparkContext = new SparkContext(new SparkConf()
         .setAppName("datasource_redis")
         .set("spark.redis.host", "192.168.4.199")
         .set("spark.redis.port", "6379")
         .set("spark.redis.auth", "######")
         .set("spark.driver.allowMultipleContexts","true"))
      

      When the spark.driver.allowMultipleContexts is set to true, it indicates that when multiple contexts are started, only the current context is used to prevent context invoking conflicts.

    2. Insert data
      1. Saving the string
        1
        2
        val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("high","111"), ("together","333")))
        sparkContext.toRedisKV(stringRedisData)
        
      2. Saving the hash
        1
        2
        val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("saprk","123"), ("data","222")))
        sparkContext.toRedisHASH(hashRedisData, "hashRDD")
        
      3. Saving the list
        1
        2
        3
        val data = List(("school","112"), ("tom","333"))
        val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString()))
        sparkContext.toRedisLIST(listRedisData, "listRDD")
        
      4. Saving the set
        1
        2
        3
        val setData = Set(("bob","133"),("kity","322"))
        val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)](setData.mkString))
        sparkContext.toRedisSET(setRedisData, "setRDD")
        
      5. Saving the zset
        1
        2
        val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("whight","234"), ("bobo","343")))
        sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")
        
    3. Query data
      1. Query by traversing keys.
        1
        2
        3
        4
        5
        6
        val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD", "listRDD", "setRDD","zsetRDD"), 6)
        keysRDD.getKV().collect().foreach(println)
        keysRDD.getHash().collect().foreach(println)
        keysRDD.getList().collect().foreach(println)
        keysRDD.getSet().collect().foreach(println)
        keysRDD.getZSet().collect().foreach(println)
        
      2. Query by string
        1
        sparkContext.fromRedisKV(Array( "high","together")).collect().foreach{println}
        
      3. Query by hash
        1
        sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach{println}
        
      4. Query by list
        1
        sparkContext.fromRedisList(Array("listRDD")).collect().foreach{println}
        
      5. Query by set
        1
        sparkContext.fromRedisSet(Array("setRDD")).collect().foreach{println}
        
      6. Query by zset
        1
        sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach{println}
        
  • Connect to datasources through SQL APIs
    1. Create a table to connect to Redis datasource
      1
      2
      3
      4
      5
      6
      sparkSession.sql(
        "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
           'host' = '192.168.4.199',
           'port' = '6379',
           'password' = '######',
           table  'person')".stripMargin)
      
    2. Insert data
      1
      sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
      
    3. Query data
      1
      sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
      
  • Submitting a Spark Job
    1. Generate a JAR package based on the code and upload the package to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.redis.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Maven dependency
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>3.1.0</version>
    </dependency>
    <dependency>
      <groupId>com.redislabs</groupId>
      <artifactId>spark-redis</artifactId>
      <version>2.4.0</version>
    </dependency>
    
  • Connecting to datasources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.apache.spark.sql.{SparkSession};
    
    object Test_Redis_SQL {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.  
        val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate();  
       
        sparkSession.sql(
          "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
             'host' = '192.168.4.199', 'port' = '6379', 'password' = '******',table 'person')".stripMargin)  
        
        sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) 
       
        sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
      
        sparkSession.close()
      }
    }
    
  • Connecting to datasources through DataFrame APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    import org.apache.spark.sql.types._
    
    object Test_Redis_SparkSql {
      def main(args: Array[String]): Unit = {
      // Create a SparkSession session.  
      val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()
    
      // Set cross-source connection parameters.  
      val host = "192.168.4.199"
      val port = "6379"
      val table = "person"
      val auth = "######"
      val key_column = "name"
      
      // ******** setting DataFrame ********  
      // method one
      var schema = StructType(Seq(StructField("name", StringType, false),StructField("age", IntegerType, false)))
      var rdd = sparkSession.sparkContext.parallelize(Seq(Row("huawei",34),Row("Bob",19)))
      var dataFrame = sparkSession.createDataFrame(rdd, schema)
    
    // // method two
    // var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23)))
    // val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2", "age")
    
    // // method three
    // val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45)))
    
      // Write data to redis  
      dataFrame.write.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).mode(SaveMode.Overwrite).save()
    
      // Read data from redis  
      sparkSession.read.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).load().show()
    
      // Close session  
      sparkSession.close() 
      }
    }
    // methoe two
    // case class Person(name: String, age: Int)
    
  • Connecting to datasources using Spark RDD
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    import com.redislabs.provider.redis._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Test_Redis_RDD {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.  
        val sparkContext = new SparkContext(new SparkConf()
              .setAppName("datasource_redis")
              .set("spark.redis.host", "192.168.4.199")
              .set("spark.redis.port", "6379")
              .set("spark.redis.auth", "@@@@@@")
              .set("spark.driver.allowMultipleContexts","true"))
    
        //***************** Write data to redis **********************  
        // Save String type data  
        val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("high","111"), ("together","333")))   
        sparkContext.toRedisKV(stringRedisData)
      
        // Save Hash type data  
        val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("saprk","123"), ("data","222")))  
        sparkContext.toRedisHASH(hashRedisData, "hashRDD")
    
        // Save List type data  
        val data = List(("school","112"), ("tom","333"));
        val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString()))
        sparkContext.toRedisLIST(listRedisData, "listRDD")
    
        // Save Set type data  
        val setData = Set(("bob","133"),("kity","322"))
        val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)](setData.mkString))
        sparkContext.toRedisSET(setRedisData, "setRDD")
      
        // Save ZSet type data 
        val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("whight","234"), ("bobo","343")))  
        sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")
    
        // ***************************** Read data from redis *******************************************  
        // Traverse the specified key and get the value
        val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD", "listRDD", "setRDD","zsetRDD"), 6)  
        keysRDD.getKV().collect().foreach(println)
        keysRDD.getHash().collect().foreach(println)
        keysRDD.getList().collect().foreach(println)
        keysRDD.getSet().collect().foreach(println)
        keysRDD.getZSet().collect().foreach(println)
     
        // Read String type data//
        val stringRDD = sparkContext.fromRedisKV("keyPattern *")
        sparkContext.fromRedisKV(Array( "high","together")).collect().foreach{println}
      
        // Read Hash type data//
        val hashRDD = sparkContext.fromRedisHash("keyPattern *")
        sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach{println}
      
        // Read List type data//
        val listRDD = sparkContext.fromRedisList("keyPattern *")
        sparkContext.fromRedisList(Array("listRDD")).collect().foreach{println}
      
        // Read Set type data//
        val setRDD = sparkContext.fromRedisSet("keyPattern *")
        sparkContext.fromRedisSet(Array("setRDD")).collect().foreach{println}
    
        // Read ZSet type data//
        val zsetRDD = sparkContext.fromRedisZSet("keyPattern *")
        sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach{println}
      
        // close session
        sparkContext.stop()
      }
    }