Spark Structured Streaming状态操作样例程序(Scala)
功能介绍
在Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.kafkaSessionization。
当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。
object kafkaSessionization { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println("Usage: kafkaSessionization <bootstrap-servers> " + "<subscribe-type> <protocol> <service> <domain> <topics> <checkpointLocation>") System.exit(1) } val Array(bootstrapServers, subscribeType, protocol, service, domain,topics,checkpointLocation) = args val spark = SparkSession .builder .appName("kafkaSessionization") .getOrCreate() spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation) spark.streams.addListener(new StreamingQueryListener { @volatile private var startTime: Long = 0L @volatile private var endTime: Long = 0L @volatile private var numRecs: Long = 0L override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { println("Query started: " + event.id) startTime = System.currentTimeMillis } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { println("Query made progress: " + event.progress) numRecs += event.progress.numInputRows } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { println("Query terminated: " + event.id) endTime = System.currentTimeMillis } }) import spark.implicits._ val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map { x => val splitStr = x.split(",") (splitStr(0), Timestamp.valueOf(splitStr(1))) }.as[(String, Timestamp)].flatMap { case(line, timestamp) => line.split(" ").map(word => Event(sessionId = word, timestamp))} // Sessionize the events. Track number of events, start and end timestamps of session, and // and report session updates. val sessionUpdates = df .groupByKey(event => event.sessionId) .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) => // If timed out, then remove session and send final update if (state.hasTimedOut) { val finalUpdate = SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true) state.remove() finalUpdate } else { // Update start and end timestamps in session val timestamps = events.map(_.timestamp.getTime).toSeq val updatedSession = if (state.exists) { val oldSession = state.get SessionInfo( oldSession.numEvents + timestamps.size, oldSession.startTimestampMs, math.max(oldSession.endTimestampMs, timestamps.max)) } else { SessionInfo(timestamps.size, timestamps.min, timestamps.max) } state.update(updatedSession) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("10 seconds") SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false) } } // Start running the query that prints the session updates to the console val query = sessionUpdates .writeStream .outputMode("update") .format("console") .start() query.awaitTermination() }