更新时间:2024-07-04 GMT+08:00

scala样例代码

开发说明

redis只支持增强型跨源。

  • 前提条件

    在DLI管理控制台上已完成创建增强跨源连接,并绑定队列。具体操作请参考《数据湖探索用户指南》。

    认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

  • 构造依赖信息,创建SparkSession
    1. 导入依赖。
      涉及到mvn依赖
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.1.0</version>
      </dependency>
      <dependency>
        <groupId>com.redislabs</groupId>
        <artifactId>spark-redis</artifactId>
        <version>2.4.0</version>
      </dependency>
      
      import相关依赖包
      1
      2
      3
      4
      5
      import org.apache.spark.sql.{Row, SaveMode, SparkSession}
      import org.apache.spark.sql.types._
      import com.redislabs.provider.redis._
      import scala.reflect.runtime.universe._
      import org.apache.spark.{SparkConf, SparkContext}
      
  • 通过DataFrame API访问
    1. 创建session
      1
      val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()
      
    2. 构造schema
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      //method one
      var schema = StructType(Seq(StructField("name", StringType, false), StructField("age", IntegerType, false)))
      var rdd = sparkSession.sparkContext.parallelize(Seq(Row("abc",34),Row("Bob",19)))
      var dataFrame = sparkSession.createDataFrame(rdd, schema)
      // //method two
      // var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23)))
      // val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2", "age")
      // //method three 
      // case class Person(name: String, age: Int)
      // val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45)))
      

      case class Person(name: String, age: Int) 须写在object之外,可参考•通过DataFrame API访问

    3. 导入数据到Redis
      1
      2
      3
      4
      5
      6
      7
      8
      9
      dataFrame .write
        .format("redis")
        .option("host","192.168.4.199")
        .option("port","6379")
        .option("table","person")
        .option("password","******")
        .option("key.column","name")
        .mode(SaveMode.Overwrite)
        .save()
      
      表1 redis操作参数

      参数

      描述

      host

      需要连接的redis集群的IP。

      获取方式为:登录华为云官网,之后搜索redis,进入“分布式缓存服务”,接着选择“缓存管理”,根据主机名称需要的IP,可选择其中任意一个IP进行复制即可(其中也包含了port信息),请参考图1

      port

      访问端口。

      password

      连接密码。无密码时可以不填写该参数。

      table

      对应Redis中的Key或Hash Key。

      • 插入redis数据时必填。
      • 查询redis数据时与“keys.pattern”参数二选一。

      keys.pattern

      使用正则表达式匹配多个Key或Hash Key。该参数仅用于查询时使用。查询redis数据时与“table”参数二选一。

      key.column

      指定列为key值(非必选)。如果写入数据时指定了key,则查询时必须指定key,否则查询时会异常加载key。

      partitions.number

      读取数据时,并发task数。

      scan.count

      每批次读取的数据记录数,默认为100。如果在读取过程中,redis集群中的CPU使用率还有提升空间,可以调大该参数。

      iterator.grouping.size

      每批次插入的数据记录数,默认为100。如果在插入过程中,redis集群中的CPU使用率还有提升空间,可以调大该参数。

      timeout

      连接redis的超时时间,单位ms,默认值2000(2秒超时)。

      • 保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种
      • 如果需要保存嵌套的DataFrame,则通过“.option("model","binary")”进行保存
      • 指定数据过期时间:“.option("ttl",1000)”;秒为单位
      图1 获取redis的ip及端口
    4. 读取Redis上的数据
      1
      2
      3
      4
      5
      6
      7
      8
      9
      sparkSession.read
        .format("redis")
        .option("host","192.168.4.199")
        .option("port","6379")
        .option("table", "person")
        .option("password","######")
        .option("key.column","name")
        .load()
        .show()
      

      操作结果:

  • RDD操作
    1. 创建连接
      1
      2
      3
      4
      5
      6
      val sparkContext = new SparkContext(new SparkConf()
         .setAppName("datasource_redis")
         .set("spark.redis.host", "192.168.4.199")
         .set("spark.redis.port", "6379")
         .set("spark.redis.auth", "######")
         .set("spark.driver.allowMultipleContexts","true"))
      

      spark.driver.allowMultipleContexts:true 表示在启动多个context时,只使用当前的,防止发生context调用冲突。

    2. 插入数据
      1. String 保存
        1
        2
        val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("high","111"), ("together","333")))
        sparkContext.toRedisKV(stringRedisData)
        
      2. hash 保存
        1
        2
        val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("saprk","123"), ("data","222")))
        sparkContext.toRedisHASH(hashRedisData, "hashRDD")
        
      3. list 保存
        1
        2
        3
        val data = List(("school","112"), ("tom","333"))
        val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString()))
        sparkContext.toRedisLIST(listRedisData, "listRDD")
        
      4. set 保存
        1
        2
        3
        val setData = Set(("bob","133"),("kity","322"))
        val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)](setData.mkString))
        sparkContext.toRedisSET(setRedisData, "setRDD")
        
      5. zset 保存
        1
        2
        val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("whight","234"), ("bobo","343")))
        sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")
        
    3. 查询数据
      1. 通过遍历key查询
        1
        2
        3
        4
        5
        6
        val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD", "listRDD", "setRDD","zsetRDD"), 6)
        keysRDD.getKV().collect().foreach(println)
        keysRDD.getHash().collect().foreach(println)
        keysRDD.getList().collect().foreach(println)
        keysRDD.getSet().collect().foreach(println)
        keysRDD.getZSet().collect().foreach(println)
        
      2. string 查询
        1
        sparkContext.fromRedisKV(Array( "high","together")).collect().foreach{println}
        
      3. hash 查询
        1
        sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach{println}
        
      4. list 查询
        1
        sparkContext.fromRedisList(Array("listRDD")).collect().foreach{println}
        
      5. set 查询
        1
        sparkContext.fromRedisSet(Array("setRDD")).collect().foreach{println}
        
      6. zset 查询
        1
        sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach{println}
        
  • 通过SQL API 访问
    1. 创建DLI关联跨源访问 Redis的关联表。
      sparkSession.sql(
           "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
           'host' = '192.168.4.199',
           'port' = '6379',
           'password' = '######',
           table  'person')".stripMargin)
    2. 插入数据
      1
      sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
      
    3. 查询数据
      1
      sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
      
  • 提交Spark作业
    1. 将写好的代码生成jar包,上传至DLI中。

      控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

    2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

      控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
      • 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.redis。
      • 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*

      • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
      • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。

完整示例代码

  • Maven依赖
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>3.1.0</version>
    </dependency>
    <dependency>
      <groupId>com.redislabs</groupId>
      <artifactId>spark-redis</artifactId>
      <version>2.4.0</version>
    </dependency>
    
  • 通过SQL API访问
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.apache.spark.sql.{SparkSession};
    
    object Test_Redis_SQL {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.  
        val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate();  
       
        sparkSession.sql(
          "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
             'host' = '192.168.4.199', 'port' = '6379', 'password' = '******',table 'person')".stripMargin)  
        
        sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) 
       
        sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
      
        sparkSession.close()
      }
    }
    
  • 通过DataFrame API访问
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    import org.apache.spark.sql.types._
    
    object Test_Redis_SparkSql {
      def main(args: Array[String]): Unit = {
      // Create a SparkSession session.  
      val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()
    
      // Set cross-source connection parameters.  
      val host = "192.168.4.199"
      val port = "6379"
      val table = "person"
      val auth = "######"
      val key_column = "name"
      
      // ******** setting DataFrame ********  
      // method one
      var schema = StructType(Seq(StructField("name", StringType, false),StructField("age", IntegerType, false)))
      var rdd = sparkSession.sparkContext.parallelize(Seq(Row("huawei",34),Row("Bob",19)))
      var dataFrame = sparkSession.createDataFrame(rdd, schema)
    
    // // method two
    // var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23)))
    // val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2", "age")
    
    // // method three
    // val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45)))
    
      // Write data to redis  
      dataFrame.write.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).mode(SaveMode.Overwrite).save()
    
      // Read data from redis  
      sparkSession.read.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).load().show()
    
      // Close session  
      sparkSession.close() 
      }
    }
    // methoe two
    // case class Person(name: String, age: Int)
    
  • RDD 操作
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    import com.redislabs.provider.redis._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Test_Redis_RDD {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.  
        val sparkContext = new SparkContext(new SparkConf()
              .setAppName("datasource_redis")
              .set("spark.redis.host", "192.168.4.199")
              .set("spark.redis.port", "6379")
              .set("spark.redis.auth", "@@@@@@")
              .set("spark.driver.allowMultipleContexts","true"))
    
        //***************** Write data to redis **********************  
        // Save String type data  
        val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("high","111"), ("together","333")))   
        sparkContext.toRedisKV(stringRedisData)
      
        // Save Hash type data  
        val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("saprk","123"), ("data","222")))  
        sparkContext.toRedisHASH(hashRedisData, "hashRDD")
    
        // Save List type data  
        val data = List(("school","112"), ("tom","333"));
        val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString()))
        sparkContext.toRedisLIST(listRedisData, "listRDD")
    
        // Save Set type data  
        val setData = Set(("bob","133"),("kity","322"))
        val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)](setData.mkString))
        sparkContext.toRedisSET(setRedisData, "setRDD")
      
        // Save ZSet type data 
        val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("whight","234"), ("bobo","343")))  
        sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")
    
        // ***************************** Read data from redis *******************************************  
        // Traverse the specified key and get the value
        val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD", "listRDD", "setRDD","zsetRDD"), 6)  
        keysRDD.getKV().collect().foreach(println)
        keysRDD.getHash().collect().foreach(println)
        keysRDD.getList().collect().foreach(println)
        keysRDD.getSet().collect().foreach(println)
        keysRDD.getZSet().collect().foreach(println)
     
        // Read String type data//
        val stringRDD = sparkContext.fromRedisKV("keyPattern *")
        sparkContext.fromRedisKV(Array( "high","together")).collect().foreach{println}
      
        // Read Hash type data//
        val hashRDD = sparkContext.fromRedisHash("keyPattern *")
        sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach{println}
      
        // Read List type data//
        val listRDD = sparkContext.fromRedisList("keyPattern *")
        sparkContext.fromRedisList(Array("listRDD")).collect().foreach{println}
      
        // Read Set type data//
        val setRDD = sparkContext.fromRedisSet("keyPattern *")
        sparkContext.fromRedisSet(Array("setRDD")).collect().foreach{println}
    
        // Read ZSet type data//
        val zsetRDD = sparkContext.fromRedisZSet("keyPattern *")
        sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach{println}
      
        // close session
        sparkContext.stop()
      }
    }