scala样例代码
开发说明
redis只支持增强型跨源。
- 前提条件
在DLI管理控制台上已完成创建增强跨源连接,并绑定队列。具体操作请参考《数据湖探索用户指南》。
认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
- 构造依赖信息,创建SparkSession
- 导入依赖。
涉及到mvn依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis</artifactId> <version>2.4.0</version> </dependency>
import相关依赖包1 2 3 4 5
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types._ import com.redislabs.provider.redis._ import scala.reflect.runtime.universe._ import org.apache.spark.{SparkConf, SparkContext}
- 导入依赖。
- 通过DataFrame API访问
- 创建session
1
val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()
- 构造schema
1 2 3 4 5 6 7 8 9 10
//method one var schema = StructType(Seq(StructField("name", StringType, false), StructField("age", IntegerType, false))) var rdd = sparkSession.sparkContext.parallelize(Seq(Row("abc",34),Row("Bob",19))) var dataFrame = sparkSession.createDataFrame(rdd, schema) // //method two // var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23))) // val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2", "age") // //method three // case class Person(name: String, age: Int) // val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45)))
case class Person(name: String, age: Int) 须写在object之外,可参考•通过DataFrame API访问。
- 导入数据到Redis
1 2 3 4 5 6 7 8 9
dataFrame .write .format("redis") .option("host","192.168.4.199") .option("port","6379") .option("table","person") .option("password","******") .option("key.column","name") .mode(SaveMode.Overwrite) .save()
表1 redis操作参数 参数
描述
host
需要连接的redis集群的IP。
获取方式为:登录华为云官网,之后搜索redis,进入“分布式缓存服务”,接着选择“缓存管理”,根据主机名称需要的IP,可选择其中任意一个IP进行复制即可(其中也包含了port信息),请参考图1。
port
访问端口。
password
连接密码。无密码时可以不填写该参数。
table
对应Redis中的Key或Hash Key。
- 插入redis数据时必填。
- 查询redis数据时与“keys.pattern”参数二选一。
keys.pattern
使用正则表达式匹配多个Key或Hash Key。该参数仅用于查询时使用。查询redis数据时与“table”参数二选一。
key.column
指定列为key值(非必选)。如果写入数据时指定了key,则查询时必须指定key,否则查询时会异常加载key。
partitions.number
读取数据时,并发task数。
scan.count
每批次读取的数据记录数,默认为100。如果在读取过程中,redis集群中的CPU使用率还有提升空间,可以调大该参数。
iterator.grouping.size
每批次插入的数据记录数,默认为100。如果在插入过程中,redis集群中的CPU使用率还有提升空间,可以调大该参数。
timeout
连接redis的超时时间,单位ms,默认值2000(2秒超时)。
- 保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种
- 如果需要保存嵌套的DataFrame,则通过“.option("model","binary")”进行保存
- 指定数据过期时间:“.option("ttl",1000)”;秒为单位
- 读取Redis上的数据
1 2 3 4 5 6 7 8 9
sparkSession.read .format("redis") .option("host","192.168.4.199") .option("port","6379") .option("table", "person") .option("password","######") .option("key.column","name") .load() .show()
操作结果:
- 创建session
- RDD操作
- 创建连接
1 2 3 4 5 6
val sparkContext = new SparkContext(new SparkConf() .setAppName("datasource_redis") .set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "######") .set("spark.driver.allowMultipleContexts","true"))
spark.driver.allowMultipleContexts:true 表示在启动多个context时,只使用当前的,防止发生context调用冲突。
- 插入数据
- String 保存
1 2
val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("high","111"), ("together","333"))) sparkContext.toRedisKV(stringRedisData)
- hash 保存
1 2
val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("saprk","123"), ("data","222"))) sparkContext.toRedisHASH(hashRedisData, "hashRDD")
- list 保存
1 2 3
val data = List(("school","112"), ("tom","333")) val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString())) sparkContext.toRedisLIST(listRedisData, "listRDD")
- set 保存
1 2 3
val setData = Set(("bob","133"),("kity","322")) val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)](setData.mkString)) sparkContext.toRedisSET(setRedisData, "setRDD")
- zset 保存
1 2
val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("whight","234"), ("bobo","343"))) sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")
- String 保存
- 查询数据
- 通过遍历key查询
1 2 3 4 5 6
val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD", "listRDD", "setRDD","zsetRDD"), 6) keysRDD.getKV().collect().foreach(println) keysRDD.getHash().collect().foreach(println) keysRDD.getList().collect().foreach(println) keysRDD.getSet().collect().foreach(println) keysRDD.getZSet().collect().foreach(println)
- string 查询
1
sparkContext.fromRedisKV(Array( "high","together")).collect().foreach{println}
- hash 查询
1
sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach{println}
- list 查询
1
sparkContext.fromRedisList(Array("listRDD")).collect().foreach{println}
- set 查询
1
sparkContext.fromRedisSet(Array("setRDD")).collect().foreach{println}
- zset 查询
1
sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach{println}
- 通过遍历key查询
- 创建连接
- 通过SQL API 访问
- 创建DLI关联跨源访问 Redis的关联表。
sparkSession.sql( "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS ( 'host' = '192.168.4.199', 'port' = '6379', 'password' = '######', table 'person')".stripMargin)
- 插入数据
1
sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
- 查询数据
1
sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
- 创建DLI关联跨源访问 Redis的关联表。
- 提交Spark作业
- 将写好的代码生成jar包,上传至DLI中。
- 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。
控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
- 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.redis。
- 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*
- 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
- 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
完整示例代码
- Maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis</artifactId> <version>2.4.0</version> </dependency>
- 通过SQL API访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import org.apache.spark.sql.{SparkSession}; object Test_Redis_SQL { def main(args: Array[String]): Unit = { // Create a SparkSession session. val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate(); sparkSession.sql( "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS ( 'host' = '192.168.4.199', 'port' = '6379', 'password' = '******',table 'person')".stripMargin) sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println) sparkSession.close() } }
- 通过DataFrame API访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types._ object Test_Redis_SparkSql { def main(args: Array[String]): Unit = { // Create a SparkSession session. val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate() // Set cross-source connection parameters. val host = "192.168.4.199" val port = "6379" val table = "person" val auth = "######" val key_column = "name" // ******** setting DataFrame ******** // method one var schema = StructType(Seq(StructField("name", StringType, false),StructField("age", IntegerType, false))) var rdd = sparkSession.sparkContext.parallelize(Seq(Row("huawei",34),Row("Bob",19))) var dataFrame = sparkSession.createDataFrame(rdd, schema) // // method two // var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23))) // val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2", "age") // // method three // val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45))) // Write data to redis dataFrame.write.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).mode(SaveMode.Overwrite).save() // Read data from redis sparkSession.read.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).load().show() // Close session sparkSession.close() } } // methoe two // case class Person(name: String, age: Int)
- RDD 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
import com.redislabs.provider.redis._ import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test_Redis_RDD { def main(args: Array[String]): Unit = { // Create a SparkSession session. val sparkContext = new SparkContext(new SparkConf() .setAppName("datasource_redis") .set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "@@@@@@") .set("spark.driver.allowMultipleContexts","true")) //***************** Write data to redis ********************** // Save String type data val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("high","111"), ("together","333"))) sparkContext.toRedisKV(stringRedisData) // Save Hash type data val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("saprk","123"), ("data","222"))) sparkContext.toRedisHASH(hashRedisData, "hashRDD") // Save List type data val data = List(("school","112"), ("tom","333")); val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString())) sparkContext.toRedisLIST(listRedisData, "listRDD") // Save Set type data val setData = Set(("bob","133"),("kity","322")) val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)](setData.mkString)) sparkContext.toRedisSET(setRedisData, "setRDD") // Save ZSet type data val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)](("whight","234"), ("bobo","343"))) sparkContext.toRedisZSET(zsetRedisData, "zsetRDD") // ***************************** Read data from redis ******************************************* // Traverse the specified key and get the value val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD", "listRDD", "setRDD","zsetRDD"), 6) keysRDD.getKV().collect().foreach(println) keysRDD.getHash().collect().foreach(println) keysRDD.getList().collect().foreach(println) keysRDD.getSet().collect().foreach(println) keysRDD.getZSet().collect().foreach(println) // Read String type data// val stringRDD = sparkContext.fromRedisKV("keyPattern *") sparkContext.fromRedisKV(Array( "high","together")).collect().foreach{println} // Read Hash type data// val hashRDD = sparkContext.fromRedisHash("keyPattern *") sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach{println} // Read List type data// val listRDD = sparkContext.fromRedisList("keyPattern *") sparkContext.fromRedisList(Array("listRDD")).collect().foreach{println} // Read Set type data// val setRDD = sparkContext.fromRedisSet("keyPattern *") sparkContext.fromRedisSet(Array("setRDD")).collect().foreach{println} // Read ZSet type data// val zsetRDD = sparkContext.fromRedisZSet("keyPattern *") sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach{println} // close session sparkContext.stop() } }