Spark Structured Streaming对接Kafka样例程序(Scala)
功能介绍
使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.KafkaADCount。
/**
* 运行Structured Streaming任务,统计广告的有效展示和有效点击数据,结果写入kafka中
*/
object KafkaADCount {
def main(args: Array[String]): Unit = {
if (args.length < 12) {
System.err.println("Usage: KafkaWordCount <bootstrap-servers> " +
"<maxEventDelay> <reqTopic> <showTopic> <maxShowDelay> " +
"<clickTopic> <maxClickDelay> <triggerInterver> " +
"<checkpointLocation> <protocol> <service> <domain>")
System.exit(1)
}
val Array(bootstrapServers, maxEventDelay, reqTopic, showTopic,
maxShowDelay, clickTopic, maxClickDelay, triggerInterver, checkpointLocation,
protocol, service, domain) = args
val maxEventDelayMills = JavaUtils.timeStringAs(maxEventDelay, TimeUnit.MILLISECONDS)
val maxShowDelayMills = JavaUtils.timeStringAs(maxShowDelay, TimeUnit.MILLISECONDS)
val maxClickDelayMills = JavaUtils.timeStringAs(maxClickDelay, TimeUnit.MILLISECONDS)
val triggerMills = JavaUtils.timeStringAs(triggerInterver, TimeUnit.MILLISECONDS)
val spark = SparkSession
.builder
.appName("KafkaADCount")
.getOrCreate()
spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation)
import spark.implicits._
// Create DataSet representing the stream of input lines from kafka
val reqDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("kafka.security.protocol", protocol)
.option("kafka.sasl.kerberos.service.name", service)
.option("kafka.kerberos.domain.name", domain)
.option("subscribe", reqTopic)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map{
_.split('^') match {
case Array(reqAdID, reqTime) => ReqEvent(reqAdID,
Timestamp.valueOf(reqTime))
}
}
.as[ReqEvent]
.withWatermark("reqTime", maxEventDelayMills +
maxShowDelayMills + " millisecond")
val showDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("kafka.security.protocol", protocol)
.option("kafka.sasl.kerberos.service.name", service)
.option("kafka.kerberos.domain.name", domain)
.option("subscribe", showTopic)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map{
_.split('^') match {
case Array(showAdID, showID, showTime) => ShowEvent(showAdID,
showID, Timestamp.valueOf(showTime))
}
}
.as[ShowEvent]
.withWatermark("showTime", maxEventDelayMills +
maxShowDelayMills + maxClickDelayMills + " millisecond")
val clickDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("kafka.security.protocol", protocol)
.option("kafka.sasl.kerberos.service.name", service)
.option("kafka.kerberos.domain.name", domain)
.option("subscribe", clickTopic)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map{
_.split('^') match {
case Array(clickAdID, clickShowID, clickTime) => ClickEvent(clickAdID,
clickShowID, Timestamp.valueOf(clickTime))
}
}
.as[ClickEvent]
.withWatermark("clickTime", maxEventDelayMills + " millisecond")
val showStaticsQuery = reqDf.join(showDf,
expr(s"""
reqAdID = showAdID
AND showTime >= reqTime + interval ${maxShowDelayMills} millisecond
"""))
.selectExpr("concat_ws('^', showAdID, showID, showTime) as value")
.writeStream
.queryName("showEventStatics")
.outputMode("append")
.trigger(Trigger.ProcessingTime(triggerMills.millis))
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("kafka.security.protocol", protocol)
.option("kafka.sasl.kerberos.service.name", service)
.option("kafka.kerberos.domain.name", domain)
.option("topic", "showEventStatics")
.start()
val clickStaticsQuery = showDf.join(clickDf,
expr(s"""
showAdID = clickAdID AND
showID = clickShowID AND
clickTime >= showTime + interval ${maxClickDelayMills} millisecond
"""), joinType = "rightouter")
.dropDuplicates("showAdID")
.selectExpr("concat_ws('^', clickAdID, clickShowID, clickTime) as value")
.writeStream
.queryName("clickEventStatics")
.outputMode("append")
.trigger(Trigger.ProcessingTime(triggerMills.millis))
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("kafka.security.protocol", protocol)
.option("kafka.sasl.kerberos.service.name", service)
.option("kafka.kerberos.domain.name", domain)
.option("topic", "clickEventStatics")
.start()
new Thread(new Runnable {
override def run(): Unit = {
while(true) {
println("-------------get showStatic progress---------")
//println(showStaticsQuery.lastProgress)
println(showStaticsQuery.status)
println("-------------get clickStatic progress---------")
//println(clickStaticsQuery.lastProgress)
println(clickStaticsQuery.status)
Thread.sleep(10000)
}
}
}).start
spark.streams.awaitAnyTermination()
}
}