Scala Example Code
Function
In Spark applications, use StructuredStreaming to invoke Kafka interface to obtain word records. Collect the statistics of records for each word.
Code Sample
The following code is an example. For details, see com.huawei.bigdata.spark.examples.KafkaWordCount.
When new data is available in Streaming DataFrame/Dataset, outputMode is used for configuring data written to the Streaming receptor.
object KafkaWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println("Usage: KafkaWordCount <bootstrap-servers> " +
"<subscribe-type> <topics>")
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics) = args
val spark = SparkSession
.builder
.appName("KafkaWordCount")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
} Last Article: Java Example Code
Next Article: Python Example Code
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.