Updated on 2022-11-18 GMT+08:00

Scala Example Code

Function

Structured Streaming is used to read advertisement request data, display data, and click data from Kafka, obtain effective display statistics and click statistics in real time, and write the statistics to Kafka.

Example code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.KafkaADCount.

/** 
  * Run the Structured Streaming task to collect statistics on valid advertisement display and click data. The result is written into Kafka. 
  */ 
 
object KafkaADCount { 
   def main(args: Array[String]): Unit = { 
     if (args.length < 12) { 
       System.err.println("Usage: KafkaWordCount <bootstrap-servers> " + 
         "<maxEventDelay> <reqTopic> <showTopic> <maxShowDelay> " + 
         "<clickTopic> <maxClickDelay> <triggerInterver> " + 
         "<checkpointLocation> <protocol> <service> <domain>") 
       System.exit(1) 
     } 
 
     val Array(bootstrapServers, maxEventDelay, reqTopic, showTopic, 
     maxShowDelay, clickTopic, maxClickDelay, triggerInterver, checkpointLocation, 
     protocol, service, domain) = args 
 
     val maxEventDelayMills = JavaUtils.timeStringAs(maxEventDelay, TimeUnit.MILLISECONDS) 
     val maxShowDelayMills = JavaUtils.timeStringAs(maxShowDelay, TimeUnit.MILLISECONDS) 
     val maxClickDelayMills = JavaUtils.timeStringAs(maxClickDelay, TimeUnit.MILLISECONDS) 
     val triggerMills = JavaUtils.timeStringAs(triggerInterver, TimeUnit.MILLISECONDS) 
 
     val spark = SparkSession 
       .builder 
       .appName("KafkaADCount") 
       .getOrCreate() 
 
     spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation) 
 
     import spark.implicits._ 
 
     // Create DataSet representing the stream of input lines from kafka 
val reqDf = spark 
       .readStream 
       .format("kafka") 
       .option("kafka.bootstrap.servers", bootstrapServers) 
       .option("kafka.security.protocol", protocol) 
       .option("kafka.sasl.kerberos.service.name", service) 
       .option("kafka.kerberos.domain.name", domain) 
       .option("subscribe", reqTopic) 
       .load() 
       .selectExpr("CAST(value AS STRING)") 
       .as[String] 
       .map{ 
         _.split('^') match { 
           case Array(reqAdID, reqTime) => ReqEvent(reqAdID, 
             Timestamp.valueOf(reqTime)) 
         } 
       } 
       .as[ReqEvent] 
       .withWatermark("reqTime", maxEventDelayMills + 
         maxShowDelayMills + " millisecond") 
 
     val showDf = spark 
       .readStream 
       .format("kafka") 
       .option("kafka.bootstrap.servers", bootstrapServers) 
       .option("kafka.security.protocol", protocol) 
       .option("kafka.sasl.kerberos.service.name", service) 
       .option("kafka.kerberos.domain.name", domain) 
       .option("subscribe", showTopic) 
       .load() 
       .selectExpr("CAST(value AS STRING)") 
       .as[String] 
       .map{ 
         _.split('^') match { 
           case Array(showAdID, showID, showTime) => ShowEvent(showAdID, 
             showID, Timestamp.valueOf(showTime)) 
         } 
       } 
       .as[ShowEvent] 
       .withWatermark("showTime", maxEventDelayMills + 
         maxShowDelayMills + maxClickDelayMills + " millisecond") 
 
     val clickDf = spark 
       .readStream 
       .format("kafka") 
       .option("kafka.bootstrap.servers", bootstrapServers) 
       .option("kafka.security.protocol", protocol) 
       .option("kafka.sasl.kerberos.service.name", service) 
       .option("kafka.kerberos.domain.name", domain) 
       .option("subscribe", clickTopic) 
       .load() 
       .selectExpr("CAST(value AS STRING)") 
       .as[String] 
       .map{ 
         _.split('^') match { 
           case Array(clickAdID, clickShowID, clickTime) => ClickEvent(clickAdID, 
             clickShowID, Timestamp.valueOf(clickTime)) 
         } 
       } 
       .as[ClickEvent] 
       .withWatermark("clickTime", maxEventDelayMills + " millisecond") 
 
     val showStaticsQuery = reqDf.join(showDf, 
       expr(s""" 
       reqAdID = showAdID 
       AND showTime >= reqTime + interval ${maxShowDelayMills} millisecond 
       """)) 
       .selectExpr("concat_ws('^', showAdID, showID, showTime) as value") 
       .writeStream 
       .queryName("showEventStatics") 
       .outputMode("append") 
       .trigger(Trigger.ProcessingTime(triggerMills.millis)) 
       .format("kafka") 
       .option("kafka.bootstrap.servers", bootstrapServers) 
       .option("kafka.security.protocol", protocol) 
       .option("kafka.sasl.kerberos.service.name", service) 
       .option("kafka.kerberos.domain.name", domain) 
       .option("topic", "showEventStatics") 
       .start() 
 
     val clickStaticsQuery = showDf.join(clickDf, 
       expr(s""" 
       showAdID = clickAdID AND 
       showID = clickShowID AND 
       clickTime >= showTime + interval ${maxClickDelayMills} millisecond 
       """), joinType = "rightouter") 
       .dropDuplicates("showAdID") 
       .selectExpr("concat_ws('^', clickAdID, clickShowID, clickTime) as value") 
       .writeStream 
       .queryName("clickEventStatics") 
       .outputMode("append") 
       .trigger(Trigger.ProcessingTime(triggerMills.millis)) 
       .format("kafka") 
       .option("kafka.bootstrap.servers", bootstrapServers) 
       .option("kafka.security.protocol", protocol) 
       .option("kafka.sasl.kerberos.service.name", service) 
       .option("kafka.kerberos.domain.name", domain) 
       .option("topic", "clickEventStatics") 
       .start() 
 
     new Thread(new Runnable { 
       override def run(): Unit = { 
         while(true) { 
           println("-------------get showStatic progress---------") 
           //println(showStaticsQuery.lastProgress) 
println(showStaticsQuery.status) 
           println("-------------get clickStatic progress---------") 
           //println(clickStaticsQuery.lastProgress) 
println(clickStaticsQuery.status) 
           Thread.sleep(10000) 
         } 
       } 
     }).start 
 
     spark.streams.awaitAnyTermination() 
 
   } 
 }