文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Spark2x开发指南(普通模式)/
开发Spark应用/
Spark Structured Streaming状态操作样例程序/
Spark Structured Streaming状态操作样例程序(Scala)
更新时间:2024-08-03 GMT+08:00
Spark Structured Streaming状态操作样例程序(Scala)
功能介绍
在Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.kafkaSessionization。
当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。
object kafkaSessionization {
def main(args: Array[String]): Unit = {
if (args.length < 7) {
System.err.println("Usage: kafkaSessionization <bootstrap-servers> " +
"<subscribe-type> <protocol> <service> <domain> <topics> <checkpointLocation>")
System.exit(1)
}
val Array(bootstrapServers, subscribeType, protocol, service, domain,topics,checkpointLocation) = args
val spark = SparkSession
.builder
.appName("kafkaSessionization")
.getOrCreate()
spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation)
spark.streams.addListener(new StreamingQueryListener {
@volatile private var startTime: Long = 0L
@volatile private var endTime: Long = 0L
@volatile private var numRecs: Long = 0L
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
println("Query started: " + event.id)
startTime = System.currentTimeMillis
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
println("Query made progress: " + event.progress)
numRecs += event.progress.numInputRows
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println("Query terminated: " + event.id)
endTime = System.currentTimeMillis
}
})
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("kafka.security.protocol", protocol)
.option("kafka.sasl.kerberos.service.name", service)
.option("kafka.kerberos.domain.name", domain)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map { x =>
val splitStr = x.split(",")
(splitStr(0), Timestamp.valueOf(splitStr(1)))
}.as[(String, Timestamp)].flatMap { case(line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))}
// Sessionize the events. Track number of events, start and end timestamps of session, and
// and report session updates.
val sessionUpdates = df
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
// If timed out, then remove session and send final update
if (state.hasTimedOut) {
val finalUpdate =
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
state.remove()
finalUpdate
} else {
// Update start and end timestamps in session
val timestamps = events.map(_.timestamp.getTime).toSeq
val updatedSession = if (state.exists) {
val oldSession = state.get
SessionInfo(
oldSession.numEvents + timestamps.size,
oldSession.startTimestampMs,
math.max(oldSession.endTimestampMs, timestamps.max))
} else {
SessionInfo(timestamps.size, timestamps.min, timestamps.max)
}
state.update(updatedSession)
// Set timeout such that the session will be expired if no data received for 10 seconds
state.setTimeoutDuration("10 seconds")
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
}
}
// Start running the query that prints the session updates to the console
val query = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
}