PySpark Example Code

Development description

Redis supports only enhanced datasource connections. Only yearly/monthly queues can be used.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in yearly/monthly packages. For details, see Data Lake Insight User Guide.

  • Connecting to datasources through DataFrame APIs
    1. Import dependencies
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
    2. Create a session
      1
      sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate()
      
    3. Set connection parameters
      1
      2
      3
      4
      host = "192.168.4.199"
      port = "6379"
      table = "person"
      auth = "@@@@@@"
      
    4. Create a DataFrame
      1. Method 1:
        1
        2
        3
        4
        5
        dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)])
        schema = StructType([StructField("id", IntegerType(), False),          
                             StructField("name", StringType(), False),
                             StructField("age", IntegerType(), False)])
        dataFrame = sparkSession.createDataFrame(dataList, schema)
        
      2. Method 2:
        1
        2
        jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])
        dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age")
        
    5. Importing data to Redis
      1
      2
      3
      4
      5
      6
      7
      8
      dataFrame.write
        .format("redis")
        .option("host", host)
        .option("port", port)
        .option("table", table)
        .option("password", auth)
        .mode("Overwrite")
        .save()
      
      • Save type. The options are Overwrite, Append, ErrorIfExis, and Ignore.
      • To specify a key, use .option("key.column", "name"). name indicates the column name.
      • To save nested DataFrames, use .option("model", "binary").
      • If you need to specify the data expiration time, use .option("ttl", 1000). The unit is second.
    6. Read data from Redis
      1
      sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()
      
    7. Operation result

  • Connect to datasources through SQL APIs
    1. Create a table to connect to Redis datasource
      1
      2
      3
      4
      5
      6
      sparkSession.sql(
        "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
           'host' = '192.168.4.199',
           'port' = '6379',
           'password' = '######',
           table  'person')".stripMargin)
      
    2. Insert data
      1
      sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
      
    3. Query data
      1
      sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
      
  • Submitting a Spark job
    1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.redis.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Connecting to datasources through DataFrame APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate()
      
      # Set cross-source connection parameters.
      host = "192.168.4.199"
      port = "6379"
      table = "person"  
      auth = "######"
         
      # Create a DataFrame and initialize the DataFrame data.    
      # *******   method noe   *********    
      dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)])
      schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)])
      dataFrame_one = sparkSession.createDataFrame(dataList, schema)
    
      # ****** method two ******  
      # jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])
      # dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age")
      
      # Write data to the redis table  
      dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save()  
      # Read data  
      sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()
      
      # close session  
      sparkSession.stop()
    
  • Connecting to datasources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession  
      sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()
    
      sparkSession.sql("CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
        'host' = '192.168.4.199', 
        'port' = '6379',
        'password' = '######',
        'table'= 'person')".stripMargin); 
    
      sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
      
      sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
       
      # close session  
      sparkSession.stop()