Updated on 2024-08-10 GMT+08:00

Spark Structured Streaming Sample Project (Scala)

Function

The project uses Structured Streaming in Spark applications to call Kafka APIs to obtain word records. Word records are classified to obtain the number of records of each word.

Sample Code

The following code is an example. For details, see com.huawei.bigdata.spark.examples.KafkaWordCount.

When new data is available in Streaming DataFrame/Dataset, outputMode is used for configuring data written to the Streaming receptor.

object KafkaWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 3) {
      System.err.println("Usage: KafkaWordCount <bootstrap-servers> " +
        "<subscribe-type> <topics>")
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics) = args

    val spark = SparkSession
      .builder
      .appName("KafkaWordCount")
      .getOrCreate()

    import spark.implicits._

    //Create a dataset that represents the stream of input lines from Kafka.
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

     //Generate the running word counts.
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    //Start running the query that prints the running counts to the console.
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}