Updated on 2024-08-10 GMT+08:00

Spark Structured Streaming Sample Project (Python)

Function

The project uses Structured Streaming in Spark applications to call Kafka APIs to obtain word records. Word records are classified to obtain the number of records of each word.

Sample Code

The following code snippets are used as an example. For complete codes, see SecurityKafkaWordCount.

When new data is available in Streaming DataFrame/Dataset, outputMode is used for configuring data written to the Streaming receptor.

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

if __name__ == "__main__":
    if len(sys.argv) < 6:
        print("Usage: <bootstrapServers> <subscribeType> <topics> <protocol> <service> <domain>")
        exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]
    protocol = sys.argv[4]
    service = sys.argv[5]
    domain = sys.argv[6]

    # Initialize SparkSession.
    spark = SparkSession.builder.appName("SecurityKafkaWordCount").getOrCreate()

    # Create the DataFrame of input lines stream from Kafka.
	# In security mode, set spark/conf/jaas.conf and jaas-zk.conf to KafkaClient.
    lines = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option(subscribeType, topics)\
    .option("kafka.security.protocol", protocol)\
    .option("kafka.sasl.kerberos.service.name", service)\
    .option("kafka.kerberos.domain.name", domain)\
    .load()\
    .selectExpr("CAST(value AS STRING)")


    # Split lines into words.
    words = lines.select(explode(split(lines.value, " ")).alias("word"))
    # Generate the running word count.
    wordCounts = words.groupBy("word").count()

    # Start to query whether the running counts are printed to the console.
    query = wordCounts.writeStream\
    .outputMode("complete")\
    .format("console")\
    .start()

    query.awaitTermination()