Updated on 2024-08-10 GMT+08:00

Interconnecting Spark Structured Streaming with Kafka (Scala)

Function

Structured Streaming is used to read advertisement request data, display data, and click data from Kafka, obtain effective display statistics and click statistics in real time, and write the statistics to Kafka.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.KafkaADCount.

/**
  * Run the Structured Streaming task to collect statistics on valid advertisement display and click data. The result is written into Kafka.
  */

object KafkaADCount {
   def main(args: Array[String]): Unit = {
     if (args.length < 12) {
       System.err.println("Usage: KafkaWordCount <bootstrap-servers> " +
         "<maxEventDelay> <reqTopic> <showTopic> <maxShowDelay> " +
         "<clickTopic> <maxClickDelay> <triggerInterver> " +
         "<checkpointLocation> <protocol> <service> <domain>")
       System.exit(1)
     }

     val Array(bootstrapServers, maxEventDelay, reqTopic, showTopic,
     maxShowDelay, clickTopic, maxClickDelay, triggerInterver, checkpointLocation,
     protocol, service, domain) = args

     val maxEventDelayMills = JavaUtils.timeStringAs(maxEventDelay, TimeUnit.MILLISECONDS)
     val maxShowDelayMills = JavaUtils.timeStringAs(maxShowDelay, TimeUnit.MILLISECONDS)
     val maxClickDelayMills = JavaUtils.timeStringAs(maxClickDelay, TimeUnit.MILLISECONDS)
     val triggerMills = JavaUtils.timeStringAs(triggerInterver, TimeUnit.MILLISECONDS)

     val spark = SparkSession
       .builder
       .appName("KafkaADCount")
       .getOrCreate()

     spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation)

     import spark.implicits._

     // Create DataSet representing the stream of input lines from kafka
     val reqDf = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("subscribe", reqTopic)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
       .map{
         _.split('^') match {
           case Array(reqAdID, reqTime) => ReqEvent(reqAdID,
             Timestamp.valueOf(reqTime))
         }
       }
       .as[ReqEvent]
       .withWatermark("reqTime", maxEventDelayMills +
         maxShowDelayMills + " millisecond")

     val showDf = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("subscribe", showTopic)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
       .map{
         _.split('^') match {
           case Array(showAdID, showID, showTime) => ShowEvent(showAdID,
             showID, Timestamp.valueOf(showTime))
         }
       }
       .as[ShowEvent]
       .withWatermark("showTime", maxEventDelayMills +
         maxShowDelayMills + maxClickDelayMills + " millisecond")

     val clickDf = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("subscribe", clickTopic)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
       .map{
         _.split('^') match {
           case Array(clickAdID, clickShowID, clickTime) => ClickEvent(clickAdID,
             clickShowID, Timestamp.valueOf(clickTime))
         }
       }
       .as[ClickEvent]
       .withWatermark("clickTime", maxEventDelayMills + " millisecond")

     val showStaticsQuery = reqDf.join(showDf,
       expr(s"""
       reqAdID = showAdID
       AND showTime >= reqTime + interval ${maxShowDelayMills} millisecond
       """))
       .selectExpr("concat_ws('^', showAdID, showID, showTime) as value")
       .writeStream
       .queryName("showEventStatics")
       .outputMode("append")
       .trigger(Trigger.ProcessingTime(triggerMills.millis))
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("topic", "showEventStatics")
       .start()

     val clickStaticsQuery = showDf.join(clickDf,
       expr(s"""
       showAdID = clickAdID AND
       showID = clickShowID AND
       clickTime >= showTime + interval ${maxClickDelayMills} millisecond
       """), joinType = "rightouter")
       .dropDuplicates("showAdID")
       .selectExpr("concat_ws('^', clickAdID, clickShowID, clickTime) as value")
       .writeStream
       .queryName("clickEventStatics")
       .outputMode("append")
       .trigger(Trigger.ProcessingTime(triggerMills.millis))
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option("kafka.security.protocol", protocol)
       .option("kafka.sasl.kerberos.service.name", service)
       .option("kafka.kerberos.domain.name", domain)
       .option("topic", "clickEventStatics")
       .start()

     new Thread(new Runnable {
       override def run(): Unit = {
         while(true) {
           println("-------------get showStatic progress---------")
           //println(showStaticsQuery.lastProgress)
           println(showStaticsQuery.status)
           println("-------------get clickStatic progress---------")
           //println(clickStaticsQuery.lastProgress)
           println(clickStaticsQuery.status)
           Thread.sleep(10000)
         }
       }
     }).start

     spark.streams.awaitAnyTermination()

   }
 }