更新时间:2022-09-08 GMT+08:00
分享

Scala样例代码

功能介绍

在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.KafkaWordCount。

当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。

object KafkaWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 3) {
      System.err.println("Usage: KafkaWordCount <bootstrap-servers> " +
        "<subscribe-type> <topics>")
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics) = args

    val spark = SparkSession
      .builder
      .appName("KafkaWordCount")
      .getOrCreate()

    import spark.implicits._

    //创建表示来自kafka的输入行流的DataSet。
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    //生成运行字数。
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    //开始运行将运行计数打印到控制台的查询。
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}
分享:

    相关文档

    相关产品