Spark Structured Streaming Sample Project (Python)
Function
The project uses Structured Streaming in Spark applications to call Kafka APIs to obtain word records. Word records are classified to obtain the number of records of each word.
Sample Code
The following code snippets are used as an example. For complete codes, see SecurityKafkaWordCount.
When new data is available in Streaming DataFrame/Dataset, outputMode is used for configuring data written to the Streaming receptor.
#!/usr/bin/python # -*- coding: utf-8 -*- import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split if __name__ == "__main__": if len(sys.argv) < 3: print("Usage: <bootstrapServers> <subscribeType> <topics>") exit(-1) bootstrapServers = sys.argv[1] subscribeType = sys.argv[2] topics = sys.argv[3] # Initialize SparkSession. spark = SparkSession.builder.appName("KafkaWordCount").getOrCreate() # Create the DataFrame of input lines stream from Kafka. # In security mode, set spark/conf/jaas.conf and jaas-zk.conf to KafkaClient. lines = spark.readStream.format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option(subscribeType, topics)\ .load()\ .selectExpr("CAST(value AS STRING)") # Split lines into words. words = lines.select(explode(split(lines.value, " ")).alias("word")) # Generate the running word count. wordCounts = words.groupBy("word").count() # Start to query whether the running counts are printed to the console. query = wordCounts.writeStream\ .outputMode("complete")\ .format("console")\ .start() query.awaitTermination()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot