Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with Redis (By PySpark)> Complete Example Code

Complete Example Code

Updated at: Mar 17, 2020 GMT+08:00

Connecting to Datasources Through DataFrame APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession
if __name__ == "__main__":
  # Create a SparkSession session.    
  sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate()
  
  # Set cross-source connection parameters.
  host = "192.168.4.199"
  port = "6379"
  table = "person"  
  auth = "######"
     
  # Create a DataFrame and initialize the DataFrame data.    
  # *******   method noe   *********    
  dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)])
  schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)])
  dataFrame_one = sparkSession.createDataFrame(dataList, schema)

  # ****** method two ******  
  # jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])
  # dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age")
  
  # Write data to the redis table  
  dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save()  
  # Read data  
  sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()
  
  # close session  
  sparkSession.stop()

Connecting to Datasources Using Spark RDD

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# _*_ coding: utf-8 _*_
from __future__ import print_function
from _ast import Listfrom _ctypes import Array
from collections import Set
from tokenize import String
from pyspark import SparkContext, SparkConf, RDD

if __name__ == "__main__":
  # Create a SparkSession session. 
  sparkContext = SparkContext(SparkConf()               
                              .setAppName("datasource_redis")     
                              .set("spark.redis.host", "192.168.4.199")     
                              .set("spark.redis.port", "6379")          
                              .set("spark.redis.auth", "######")          
                              .set("spark.driver.allowMultipleContexts", "true"));
  
  # ***************** Write data to redis **********************   
  # Save String type data  
  stringRedisData: RDD[(String, String)] = sparkContext.parallelize([(String, String)](("high", "111"), ("together", "333"))) 
  sparkContext.toRedisKV(stringRedisData)

  # Save Hash type data  
  hashRedisData: RDD[(String, String)] = sparkContext.parallelize([(String, String)](("high", "111"), ("together", "333"))) 
  sparkContext.toRedisHASH(hashRedisData, "hashRDD")

  # Save List type data  
  data = List(("high", "111"), ("tom", "333"))  
  listRedisData: RDD[String] = sparkContext.parallelize([(String)](data.toString()))
  sparkContext.toRedisLIST(listRedisData, "listRDD")
  # sparkContext.toRedisFixedLIST(listRedisData, "bar", 40)

  # Save Set type data 
  setData = Set(("bob", "123"), ("kity", "222"))
  setRedisData: RDD[(String)] = sparkContext.parallelize([(String)](setData.mkString))
  sparkContext.toRedisSET(setRedisData, "setRDD")

  # Save ZSet type data  
  zsetRedisData: RDD[(String, String)] = sparkContext.parallelize([(String, String)](("high", "111"), ("together", "333")))    
  sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")

  # *****************************  Read data from redis  ******************************************* 
  # Traverse the specified key and get the value 
  keysRDD = sparkContext.fromRedisKeys(Array("high", "together", "hashRDD", "listRDD", "setRDD", "zsetRDD"), 6)   
  keysRDD.getKV().collect().foreach(println)
  keysRDD.getHash().collect().foreach(println)  
  keysRDD.getList().collect().foreach(println) 
  keysRDD.getSet().collect().foreach(println)
  keysRDD.getZSet().collect().foreach(println)

  #Read String type data  
  sparkContext.fromRedisKV(Array("high", "together")).collect().foreach(println)
 
  # Read Hash type data  
  sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach(println)
  
  # Read List type data  
  sparkContext.fromRedisList(Array("listRDD")).collect().foreach(println)

  # Read Set type data 
  sparkContext.fromRedisSet(Array("setRDD")).collect().foreach(println)
 
  # Read ZSet type data 
  sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach(println)

  # close session  
  sparkContext.stop()

Connecting to Datasources Through SQL APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql import SparkSession

if __name__ == "__main__":
  # Create a SparkSession  
  sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()

  sparkSession.sql("CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
    'host' = '192.168.4.199', 
    'port' = '6379',
    'password' = '######',
    'table'= 'person')".stripMargin); 

  sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
  
  sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
   
  # close session  
  sparkSession.stop()

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel