更新时间:2023-04-12 GMT+08:00
Scala样例代码
功能介绍
在Spark应用中,通过使用StructuredStreaming调用kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SecurityKafkaWordCount。
- 普通集群需要将样例代码中com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.scala中第49行代码“.option("kafka.security.protocol", protocol)”注释掉。
- 当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。其默认值为“append”。
object SecurityKafkaWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println("Usage: SecurityKafkaWordCount <bootstrap-servers> " +
"<subscribe-type> <topics> <protocol> <service> <domain>")
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics, protocol, service, domain) = args
val spark = SparkSession
.builder
.appName("SecurityKafkaWordCount")
.getOrCreate()
import spark.implicits._
//创建表示来自kafka的输入行流的DataSet。
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.option("kafka.security.protocol", protocol)
.option("kafka.sasl.kerberos.service.name", service)
.option("kafka.kerberos.domain.name", domain)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
//生成运行字数。
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
//开始运行将运行计数打印到控制台的查询。
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}