更新时间:2022-09-08 GMT+08:00
分享

Python样例代码

功能介绍

在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。

代码样例

下面代码片段仅为演示,具体代码参见:SecurityKafkaWordCount。

当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

if __name__ == "__main__":
    if len(sys.argv) < 6:
        print("Usage: <bootstrapServers> <subscribeType> <topics> <protocol> <service> <domain>")
        exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]
    protocol = sys.argv[4]
    service = sys.argv[5]
    domain = sys.argv[6]

    # 初始化sparkSession
    spark = SparkSession.builder.appName("SecurityKafkaWordCount").getOrCreate()

    # 创建表示来自kafka的input lines stream的DataFrame
	# 安全模式要修改spark/conf/jaas.conf和jaas-zk.conf为KafkaClient
    lines = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option(subscribeType, topics)\
    .option("kafka.security.protocol", protocol)\
    .option("kafka.sasl.kerberos.service.name", service)\
    .option("kafka.kerberos.domain.name", domain)\
    .load()\
    .selectExpr("CAST(value AS STRING)")


    # 将lines切分为word
    words = lines.select(explode(split(lines.value, " ")).alias("word"))
    # 生成正在运行的word count
    wordCounts = words.groupBy("word").count()

    # 开始运行将running counts打印到控制台的查询
    query = wordCounts.writeStream\
    .outputMode("complete")\
    .format("console")\
    .start()

    query.awaitTermination()
分享:

    相关文档

    相关产品