Python Sample Code
Function
In Spark applications, use StructuredStreaming to invoke Kafka APIs to obtain word records. Classify word records to obtain the number of records of each word.
Sample Code
The following code segment is only an example. For details, see SecurityKafkaWordCount.
When there is new available data in Streaming DataFrame/Dataset, outputMode indicates data written to the Streaming receiver. The default value is append. To change the output mode, see the outputMode description of Spark2x > Scala in [conref:text]Target does not exist .
#!/usr/bin/python # -*- coding: utf-8 -*- import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split if __name__ == "__main__": if len(sys.argv) < 3: print("Usage: <bootstrapServers> <subscribeType> <topics>") exit(-1) bootstrapServers = sys.argv[1] subscribeType = sys.argv[2] topics = sys.argv[3] # Initialize the SparkSession. spark = SparkSession.builder.appName("KafkaWordCount").getOrCreate() # Create the DataFrame of input lines stream from Kafka. # In security mode, set spark/conf/jaas.conf and jaas-zk.conf to KafkaClient. lines = spark.readStream.format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option(subscribeType, topics)\ .load()\ .selectExpr("CAST(value AS STRING)") # Split lines into words. words = lines.select(explode(split(lines.value, " ")).alias("word")) # Generate the running word count. wordCounts = words.groupBy("word").count() # Start to query whether the running counts are printed to the console. query = wordCounts.writeStream\ .outputMode("complete")\ .format("console")\ .start() query.awaitTermination()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.