文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Spark2x开发指南(安全模式)/
开发Spark应用/
Spark Structured Streaming样例程序/
Spark Structured Streaming样例程序(Python)
更新时间:2024-08-03 GMT+08:00
Spark Structured Streaming样例程序(Python)
功能介绍
在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。
代码样例
下面代码片段仅为演示,具体代码参见:SecurityKafkaWordCount。
当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。
#!/usr/bin/python # -*- coding: utf-8 -*- import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split if __name__ == "__main__": if len(sys.argv) < 6: print("Usage: <bootstrapServers> <subscribeType> <topics> <protocol> <service> <domain>") exit(-1) bootstrapServers = sys.argv[1] subscribeType = sys.argv[2] topics = sys.argv[3] protocol = sys.argv[4] service = sys.argv[5] domain = sys.argv[6] # 初始化sparkSession spark = SparkSession.builder.appName("SecurityKafkaWordCount").getOrCreate() # 创建表示来自kafka的input lines stream的DataFrame # 安全模式要修改spark/conf/jaas.conf和jaas-zk.conf为KafkaClient lines = spark.readStream.format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option(subscribeType, topics)\ .option("kafka.security.protocol", protocol)\ .option("kafka.sasl.kerberos.service.name", service)\ .option("kafka.kerberos.domain.name", domain)\ .load()\ .selectExpr("CAST(value AS STRING)") # 将lines切分为word words = lines.select(explode(split(lines.value, " ")).alias("word")) # 生成正在运行的word count wordCounts = words.groupBy("word").count() # 开始运行将running counts打印到控制台的查询 query = wordCounts.writeStream\ .outputMode("complete")\ .format("console")\ .start() query.awaitTermination()