文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(普通模式)/
开发Spark应用/
Spark Structured Streaming对接Kafka样例程序/
Spark Structured Streaming对接Kafka样例程序(Scala)
更新时间:2024-06-27 GMT+08:00
Spark Structured Streaming对接Kafka样例程序(Scala)
功能介绍
使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.KafkaADCount。
/** * 运行Structured Streaming任务,统计广告的有效展示和有效点击数据,结果写入kafka中 */ object KafkaADCount { def main(args: Array[String]): Unit = { if (args.length < 12) { System.err.println("Usage: KafkaWordCount <bootstrap-servers> " + "<maxEventDelay> <reqTopic> <showTopic> <maxShowDelay> " + "<clickTopic> <maxClickDelay> <triggerInterver> " + "<checkpointLocation> <protocol> <service> <domain>") System.exit(1) } val Array(bootstrapServers, maxEventDelay, reqTopic, showTopic, maxShowDelay, clickTopic, maxClickDelay, triggerInterver, checkpointLocation, protocol, service, domain) = args val maxEventDelayMills = JavaUtils.timeStringAs(maxEventDelay, TimeUnit.MILLISECONDS) val maxShowDelayMills = JavaUtils.timeStringAs(maxShowDelay, TimeUnit.MILLISECONDS) val maxClickDelayMills = JavaUtils.timeStringAs(maxClickDelay, TimeUnit.MILLISECONDS) val triggerMills = JavaUtils.timeStringAs(triggerInterver, TimeUnit.MILLISECONDS) val spark = SparkSession .builder .appName("KafkaADCount") .getOrCreate() spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation) import spark.implicits._ // Create DataSet representing the stream of input lines from kafka val reqDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("subscribe", reqTopic) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map{ _.split('^') match { case Array(reqAdID, reqTime) => ReqEvent(reqAdID, Timestamp.valueOf(reqTime)) } } .as[ReqEvent] .withWatermark("reqTime", maxEventDelayMills + maxShowDelayMills + " millisecond") val showDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("subscribe", showTopic) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map{ _.split('^') match { case Array(showAdID, showID, showTime) => ShowEvent(showAdID, showID, Timestamp.valueOf(showTime)) } } .as[ShowEvent] .withWatermark("showTime", maxEventDelayMills + maxShowDelayMills + maxClickDelayMills + " millisecond") val clickDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("subscribe", clickTopic) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map{ _.split('^') match { case Array(clickAdID, clickShowID, clickTime) => ClickEvent(clickAdID, clickShowID, Timestamp.valueOf(clickTime)) } } .as[ClickEvent] .withWatermark("clickTime", maxEventDelayMills + " millisecond") val showStaticsQuery = reqDf.join(showDf, expr(s""" reqAdID = showAdID AND showTime >= reqTime + interval ${maxShowDelayMills} millisecond """)) .selectExpr("concat_ws('^', showAdID, showID, showTime) as value") .writeStream .queryName("showEventStatics") .outputMode("append") .trigger(Trigger.ProcessingTime(triggerMills.millis)) .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("topic", "showEventStatics") .start() val clickStaticsQuery = showDf.join(clickDf, expr(s""" showAdID = clickAdID AND showID = clickShowID AND clickTime >= showTime + interval ${maxClickDelayMills} millisecond """), joinType = "rightouter") .dropDuplicates("showAdID") .selectExpr("concat_ws('^', clickAdID, clickShowID, clickTime) as value") .writeStream .queryName("clickEventStatics") .outputMode("append") .trigger(Trigger.ProcessingTime(triggerMills.millis)) .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("topic", "clickEventStatics") .start() new Thread(new Runnable { override def run(): Unit = { while(true) { println("-------------get showStatic progress---------") //println(showStaticsQuery.lastProgress) println(showStaticsQuery.status) println("-------------get clickStatic progress---------") //println(clickStaticsQuery.lastProgress) println(clickStaticsQuery.status) Thread.sleep(10000) } } }).start spark.streams.awaitAnyTermination() } }