文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Spark2x开发指南(安全模式)/
开发Spark应用/
Spark Structured Streaming样例程序/
Spark Structured Streaming样例程序(Python)
更新时间:2024-08-03 GMT+08:00
Spark Structured Streaming样例程序(Python)
功能介绍
在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。
代码样例
下面代码片段仅为演示,具体代码参见:SecurityKafkaWordCount。
当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
if __name__ == "__main__":
if len(sys.argv) < 6:
print("Usage: <bootstrapServers> <subscribeType> <topics> <protocol> <service> <domain>")
exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
protocol = sys.argv[4]
service = sys.argv[5]
domain = sys.argv[6]
# 初始化sparkSession
spark = SparkSession.builder.appName("SecurityKafkaWordCount").getOrCreate()
# 创建表示来自kafka的input lines stream的DataFrame
# 安全模式要修改spark/conf/jaas.conf和jaas-zk.conf为KafkaClient
lines = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.option("kafka.security.protocol", protocol)\
.option("kafka.sasl.kerberos.service.name", service)\
.option("kafka.kerberos.domain.name", domain)\
.load()\
.selectExpr("CAST(value AS STRING)")
# 将lines切分为word
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# 生成正在运行的word count
wordCounts = words.groupBy("word").count()
# 开始运行将running counts打印到控制台的查询
query = wordCounts.writeStream\
.outputMode("complete")\
.format("console")\
.start()
query.awaitTermination()