更新时间:2024-08-03 GMT+08:00

Spark Structured Streaming对接Kafka样例程序(Scala)

功能介绍

使用Structured Streaming,从Kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.KafkaADCount。

/**
  * 运行Structured Streaming任务,统计广告的有效展示和有效点击数据,结果写入kafka中
  */

object KafkaADCount {
   def main(args: Array[String]): Unit = {
     if (args.length < 12) {
       System.err.println("Usage: KafkaWordCount <bootstrap-servers> " +
         "<maxEventDelay> <reqTopic> <showTopic> <maxShowDelay> " +
         "<clickTopic> <maxClickDelay> <triggerInterver> " +
         "<checkpointLocation> <protocol> <service> <domain>")
       System.exit(1)
     }

     val Array(bootstrapServers, maxEventDelay, reqTopic, showTopic,
     maxShowDelay, clickTopic, maxClickDelay, triggerInterver, checkpointLocation,
     protocol, service, domain) = args

     val maxEventDelayMills = JavaUtils.timeStringAs(maxEventDelay, TimeUnit.MILLISECONDS)
     val maxShowDelayMills = JavaUtils.timeStringAs(maxShowDelay, TimeUnit.MILLISECONDS)
     val maxClickDelayMills = JavaUtils.timeStringAs(maxClickDelay, TimeUnit.MILLISECONDS)
     val triggerMills = JavaUtils.timeStringAs(triggerInterver, TimeUnit.MILLISECONDS)

     val spark = SparkSession
       .builder
       .appName("KafkaADCount")
       .getOrCreate()

     spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation)

     import spark.implicits._

     // Create DataSet representing the stream of input lines from kafka
     val reqDf = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("subscribe", reqTopic)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
       .map{
         _.split('^') match {
           case Array(reqAdID, reqTime) => ReqEvent(reqAdID,
             Timestamp.valueOf(reqTime))
         }
       }
       .as[ReqEvent]
       .withWatermark("reqTime", maxEventDelayMills +
         maxShowDelayMills + " millisecond")

     val showDf = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("subscribe", showTopic)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
       .map{
         _.split('^') match {
           case Array(showAdID, showID, showTime) => ShowEvent(showAdID,
             showID, Timestamp.valueOf(showTime))
         }
       }
       .as[ShowEvent]
       .withWatermark("showTime", maxEventDelayMills +
         maxShowDelayMills + maxClickDelayMills + " millisecond")

     val clickDf = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("subscribe", clickTopic)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
       .map{
         _.split('^') match {
           case Array(clickAdID, clickShowID, clickTime) => ClickEvent(clickAdID,
             clickShowID, Timestamp.valueOf(clickTime))
         }
       }
       .as[ClickEvent]
       .withWatermark("clickTime", maxEventDelayMills + " millisecond")

     val showStaticsQuery = reqDf.join(showDf,
       expr(s"""
       reqAdID = showAdID
       AND showTime >= reqTime + interval ${maxShowDelayMills} millisecond
       """))
       .selectExpr("concat_ws('^', showAdID, showID, showTime) as value")
       .writeStream
       .queryName("showEventStatics")
       .outputMode("append")
       .trigger(Trigger.ProcessingTime(triggerMills.millis))
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("topic", "showEventStatics")
       .start()

     val clickStaticsQuery = showDf.join(clickDf,
       expr(s"""
       showAdID = clickAdID AND
       showID = clickShowID AND
       clickTime >= showTime + interval ${maxClickDelayMills} millisecond
       """), joinType = "rightouter")
       .dropDuplicates("showAdID")
       .selectExpr("concat_ws('^', clickAdID, clickShowID, clickTime) as value")
       .writeStream
       .queryName("clickEventStatics")
       .outputMode("append")
       .trigger(Trigger.ProcessingTime(triggerMills.millis))
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("topic", "clickEventStatics")
       .start()

     new Thread(new Runnable {
       override def run(): Unit = {
         while(true) {
           println("-------------get showStatic progress---------")
           //println(showStaticsQuery.lastProgress)
           println(showStaticsQuery.status)
           println("-------------get clickStatic progress---------")
           //println(clickStaticsQuery.lastProgress)
           println(clickStaticsQuery.status)
           Thread.sleep(10000)
         }
       }
     }).start

     spark.streams.awaitAnyTermination()

   }
 }