Spark Structured Streaming样例程序(Scala)
功能介绍
在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SecurityKafkaWordCount。
当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。
object SecurityKafkaWordCount { def main(args: Array[String]): Unit = { if (args.length < 6) { System.err.println("Usage: SecurityKafkaWordCount <bootstrap-servers> " + "<subscribe-type> <topics> <protocol> <service> <domain>") System.exit(1) } val Array(bootstrapServers, subscribeType, topics, protocol, service, domain) = args val spark = SparkSession .builder .appName("SecurityKafkaWordCount") .getOrCreate() import spark.implicits._ //创建表示来自kafka的输入行流的DataSet。 val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .load() .selectExpr("CAST(value AS STRING)") .as[String] //生成运行字数。 val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() //开始运行将运行计数打印到控制台的查询。 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }