Interconnecting Spark Structured Streaming with Kafka (Scala)
Function
Structured Streaming is used to read advertisement request data, display data, and click data from Kafka, obtain effective display statistics and click statistics in real time, and write the statistics to Kafka.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.KafkaADCount.
/** * Run the Structured Streaming task to collect statistics on valid advertisement display and click data. The result is written into Kafka. */ object KafkaADCount { def main(args: Array[String]): Unit = { if (args.length < 12) { System.err.println("Usage: KafkaWordCount <bootstrap-servers> " + "<maxEventDelay> <reqTopic> <showTopic> <maxShowDelay> " + "<clickTopic> <maxClickDelay> <triggerInterver> " + "<checkpointLocation> <protocol> <service> <domain>") System.exit(1) } val Array(bootstrapServers, maxEventDelay, reqTopic, showTopic, maxShowDelay, clickTopic, maxClickDelay, triggerInterver, checkpointLocation, protocol, service, domain) = args val maxEventDelayMills = JavaUtils.timeStringAs(maxEventDelay, TimeUnit.MILLISECONDS) val maxShowDelayMills = JavaUtils.timeStringAs(maxShowDelay, TimeUnit.MILLISECONDS) val maxClickDelayMills = JavaUtils.timeStringAs(maxClickDelay, TimeUnit.MILLISECONDS) val triggerMills = JavaUtils.timeStringAs(triggerInterver, TimeUnit.MILLISECONDS) val spark = SparkSession .builder .appName("KafkaADCount") .getOrCreate() spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation) import spark.implicits._ // Create DataSet representing the stream of input lines from kafka val reqDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("subscribe", reqTopic) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map{ _.split('^') match { case Array(reqAdID, reqTime) => ReqEvent(reqAdID, Timestamp.valueOf(reqTime)) } } .as[ReqEvent] .withWatermark("reqTime", maxEventDelayMills + maxShowDelayMills + " millisecond") val showDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("subscribe", showTopic) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map{ _.split('^') match { case Array(showAdID, showID, showTime) => ShowEvent(showAdID, showID, Timestamp.valueOf(showTime)) } } .as[ShowEvent] .withWatermark("showTime", maxEventDelayMills + maxShowDelayMills + maxClickDelayMills + " millisecond") val clickDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("subscribe", clickTopic) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map{ _.split('^') match { case Array(clickAdID, clickShowID, clickTime) => ClickEvent(clickAdID, clickShowID, Timestamp.valueOf(clickTime)) } } .as[ClickEvent] .withWatermark("clickTime", maxEventDelayMills + " millisecond") val showStaticsQuery = reqDf.join(showDf, expr(s""" reqAdID = showAdID AND showTime >= reqTime + interval ${maxShowDelayMills} millisecond """)) .selectExpr("concat_ws('^', showAdID, showID, showTime) as value") .writeStream .queryName("showEventStatics") .outputMode("append") .trigger(Trigger.ProcessingTime(triggerMills.millis)) .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("topic", "showEventStatics") .start() val clickStaticsQuery = showDf.join(clickDf, expr(s""" showAdID = clickAdID AND showID = clickShowID AND clickTime >= showTime + interval ${maxClickDelayMills} millisecond """), joinType = "rightouter") .dropDuplicates("showAdID") .selectExpr("concat_ws('^', clickAdID, clickShowID, clickTime) as value") .writeStream .queryName("clickEventStatics") .outputMode("append") .trigger(Trigger.ProcessingTime(triggerMills.millis)) .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option("topic", "clickEventStatics") .start() new Thread(new Runnable { override def run(): Unit = { while(true) { println("-------------get showStatic progress---------") //println(showStaticsQuery.lastProgress) println(showStaticsQuery.status) println("-------------get clickStatic progress---------") //println(clickStaticsQuery.lastProgress) println(clickStaticsQuery.status) Thread.sleep(10000) } } }).start spark.streams.awaitAnyTermination() } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot