Deze pagina is nog niet beschikbaar in uw eigen taal. We werken er hard aan om meer taalversies toe te voegen. Bedankt voor uw steun.

PySpark Example Code

Updated on 2024-06-13 GMT+08:00

Development Description

Redis supports only enhanced datasource connections.

  • Prerequisites

    An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages.

    NOTE:

    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Connecting to data sources through DataFrame APIs
    1. Import dependencies.
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
    2. Create a session.
      1
      sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate()
      
    3. Set connection parameters.
      1
      2
      3
      4
      host = "192.168.4.199"
      port = "6379"
      table = "person"
      auth = "@@@@@@"
      
    4. Create a DataFrame.
      1. Method 1:
        1
        2
        3
        4
        5
        dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)])
        schema = StructType([StructField("id", IntegerType(), False),          
                             StructField("name", StringType(), False),
                             StructField("age", IntegerType(), False)])
        dataFrame = sparkSession.createDataFrame(dataList, schema)
        
      2. Method 2:
        1
        2
        jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])
        dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age")
        
    5. Import data to Redis.
      1
      2
      3
      4
      5
      6
      7
      8
      dataFrame.write
        .format("redis")\
        .option("host", host)\
        .option("port", port)\
        .option("table", table)\
        .option("password", auth)\
        .mode("Overwrite")\
        .save()
      
      NOTE:
      • The options of mode are Overwrite, Append, ErrorIfExis, and Ignore.
      • To specify a key, use .option("key.column", "name"). name indicates the column name.
      • To save nested DataFrames, use .option("model", "binary").
      • If you need to specify the data expiration time, use .option("ttl", 1000). The unit is second.
    6. Read data from Redis.
      1
      sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()
      
    7. View the operation result.

  • Connecting to data sources through SQL APIs
    1. Create a table to connect to a Redis data source.
      sparkSession.sql(
           "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
           'host' = '192.168.4.199',
           'port' = '6379',
           'password' = '######',
           table  'person')".stripMargin)
    2. Insert data.
      1
      sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
      
    3. Query data.
      1
      sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
      
  • Submitting a Spark job
    1. Upload the Python code file to DLI.

    2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

      NOTE:
      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.redis when you submit a job.
      • If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/redis/*

      • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
      • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

  • Connecting to data sources through DataFrame APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate()
      
      # Set cross-source connection parameters.
      host = "192.168.4.199"
      port = "6379"
      table = "person"  
      auth = "######"
         
      # Create a DataFrame and initialize the DataFrame data.    
      # *******   method noe   *********    
      dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)])
      schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)])
      dataFrame_one = sparkSession.createDataFrame(dataList, schema)
    
      # ****** method two ******  
      # jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])
      # dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age")
      
      # Write data to the redis table  
      dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save()  
      # Read data  
      sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()
      
      # close session  
      sparkSession.stop()
    
  • Connecting to data sources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession  
      sparkSession = SparkSession.builder.appName("datasource_redis").getOrCreate()
    
      sparkSession.sql(
        "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (\
        'host' = '192.168.4.199', \
        'port' = '6379',\
        'password' = '######',\
        'table'= 'person')".stripMargin); 
    
      sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
      
      sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
       
      # close session  
      sparkSession.stop()
    
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback