PySpark Example Code
Development Description
Redis supports only enhanced datasource connections.
- Prerequisites
An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages. For details, see Enhanced Datasource Connections.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
- Connecting to data sources through DataFrame APIs
- Import dependencies.
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
- Create a session.
1
sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate()
- Set connection parameters.
1 2 3 4
host = "192.168.4.199" port = "6379" table = "person" auth = "@@@@@@"
- Create a DataFrame.
- Method 1:
1 2 3 4 5
dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)]) schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) dataFrame = sparkSession.createDataFrame(dataList, schema)
- Method 2:
1 2
jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)]) dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age")
- Method 1:
- Import data to Redis.
1 2 3 4 5 6 7 8
dataFrame.write .format("redis")\ .option("host", host)\ .option("port", port)\ .option("table", table)\ .option("password", auth)\ .mode("Overwrite")\ .save()
- The options of mode are Overwrite, Append, ErrorIfExis, and Ignore.
- To specify a key, use .option("key.column", "name"). name indicates the column name.
- To save nested DataFrames, use .option("model", "binary").
- If you need to specify the data expiration time, use .option("ttl", 1000). The unit is second.
- Read data from Redis.
1
sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()
- View the operation result.
- Import dependencies.
- Connecting to data sources through SQL APIs
- Create a table to connect to a Redis data source.
sparkSession.sql( "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS ( 'host' = '192.168.4.199', 'port' = '6379', 'password' = '######', table 'person')".stripMargin)
- Insert data.
1
sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
- Query data.
1
sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
- Create a table to connect to a Redis data source.
- Submitting a Spark job
- Upload the Python code file to DLI.
For details about console operations, see Creating a Package. For details about API operations, see Uploading a Package Group.
- In the Spark job editor, select the corresponding dependency module and execute the Spark job.
For details about console operations, see Creating a Spark Job. For details about API operations, see Creating a Batch Processing Job.
- If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.redis when you submit a job.
- If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*
- For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
- For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.
- Upload the Python code file to DLI.
Complete Example Code
- Connecting to data sources through DataFrame APIs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate() # Set cross-source connection parameters. host = "192.168.4.199" port = "6379" table = "person" auth = "######" # Create a DataFrame and initialize the DataFrame data. # ******* method noe ********* dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)]) schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)]) dataFrame_one = sparkSession.createDataFrame(dataList, schema) # ****** method two ****** # jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)]) # dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age") # Write data to the redis table dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save() # Read data sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show() # close session sparkSession.stop()
- Connecting to data sources through SQL APIs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession sparkSession = SparkSession.builder.appName("datasource_redis").getOrCreate() sparkSession.sql( "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (\ 'host' = '192.168.4.199', \ 'port' = '6379',\ 'password' = '######',\ 'table'= 'person')".stripMargin); sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println) # close session sparkSession.stop()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot