Updated on 2022-07-11 GMT+08:00

Python Example Code

Function

In Spark applications, use StructuredStreaming to invoke Kafka APIs to obtain word records. Classify word records to obtain the number of records of each word.

Example Code

The following code segment is only an example. For details, see SecurityKafkaWordCount

When there is new available data in Streaming DataFrame/Dataset, outputMode indicates data written to the Streaming receiver.

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

if __name__ == "__main__":
    if len(sys.argv) < 6:
        print("Usage: <bootstrapServers> <subscribeType> <topics> <protocol> <service> <domain>")
        exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]
    protocol = sys.argv[4]
    service = sys.argv[5]
    domain = sys.argv[6]

    # Initialize the SparkSession. 
    spark = SparkSession.builder.appName("SecurityKafkaWordCount").getOrCreate()

    # Create the DataFrame of input lines stream from Kafka. 
         # In security mode, set spark/conf/jaas.conf and jaas-zk.conf to KafkaClient. 
    lines = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option(subscribeType, topics)\
    .option("kafka.security.protocol", protocol)\
    .option("kafka.sasl.kerberos.service.name", service)\
    .option("kafka.kerberos.domain.name", domain)\
    .load()\
    .selectExpr("CAST(value AS STRING)")


    # Split lines into words. 
    words = lines.select(explode(split(lines.value, " ")).alias("word"))
    # Generate the running word count.
    wordCounts = words.groupBy("word").count()

    # Start to query whether the running counts are printed to the console.
    query = wordCounts.writeStream\
    .outputMode("complete")\
    .format("console")\
    .start()

    query.awaitTermination()