Updated on 2022-09-14 GMT+08:00

Python Sample Code

Function

In Spark applications, use StructuredStreaming to invoke Kafka APIs to obtain word records. Classify word records to obtain the number of records of each word.

Sample Code

The following code segment is only an example. For details, see SecurityKafkaWordCount.

When there is new available data in Streaming DataFrame/Dataset, outputMode indicates data written to the Streaming receiver. The default value is append. To change the output mode, see the outputMode description of Spark2x > Scala in [conref:text]Target does not exist .

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: <bootstrapServers> <subscribeType> <topics>")
        exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    # Initialize the SparkSession.
    spark = SparkSession.builder.appName("KafkaWordCount").getOrCreate()

    # Create the DataFrame of input lines stream from Kafka.
	# In security mode, set spark/conf/jaas.conf and jaas-zk.conf to KafkaClient.
    lines = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option(subscribeType, topics)\
    .load()\
    .selectExpr("CAST(value AS STRING)")


    # Split lines into words.
    words = lines.select(explode(split(lines.value, " ")).alias("word"))
    # Generate the running word count.
    wordCounts = words.groupBy("word").count()

    # Start to query whether the running counts are printed to the console.
    query = wordCounts.writeStream\
    .outputMode("complete")\
    .format("console")\
    .start()

    query.awaitTermination()