Scala Sample Code
Function
In the Spark structure flow application, the number of events in each session and the start and end timestamp of the sessions are collected in different batches. At the same time, the system exports the sessions that are in the updated state in this batch.
Code Example
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.kafkaSessionization.
When new data is available in Streaming DataFrame/Dataset, outputMode is used for configuring data written to the Streaming receiver.
object kafkaSessionization { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println("Usage: kafkaSessionization <bootstrap-servers> " + "<subscribe-type> <protocol> <service> <domain> <topics> <checkpointLocation>") System.exit(1) } val Array(bootstrapServers, subscribeType, protocol, service, domain,topics, checkpointLocation) = args val spark = SparkSession .builder .appName("kafkaSessionization") .getOrCreate() spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation) spark.streams.addListener(new StreamingQueryListener { @volatile private var startTime: Long = 0L @volatile private var endTime: Long = 0L @volatile private var numRecs: Long = 0L override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { println("Query started: " + event.id) startTime = System.currentTimeMillis } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { println("Query made progress: " + event.progress) numRecs += event.progress.numInputRows } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { println("Query terminated: " + event.id) endTime = System.currentTimeMillis } }) import spark.implicits._ val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map { x => val splitStr = x.split(",") (splitStr(0), Timestamp.valueOf(splitStr(1))) }.as[(String, Timestamp)].flatMap { case(line, timestamp) => line.split(" ").map(word => Event(sessionId = word, timestamp))} // Sessionize the events. Track number of events, start and end timestamps of session, and // and report session updates. val sessionUpdates = df .groupByKey(event => event.sessionId) .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) => // If timed out, then remove session and send final update if (state.hasTimedOut) { val finalUpdate = SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true) state.remove() finalUpdate } else { // Update start and end timestamps in session val timestamps = events.map(_.timestamp.getTime).toSeq val updatedSession = if (state.exists) { val oldSession = state.get SessionInfo( oldSession.numEvents + timestamps.size, oldSession.startTimestampMs, math.max(oldSession.endTimestampMs, timestamps.max)) } else { SessionInfo(timestamps.size, timestamps.min, timestamps.max) } state.update(updatedSession) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("10 seconds") SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false) } } // Start running the query that prints the session updates to the console val query = sessionUpdates .writeStream .outputMode("update") .format("console") .start() query.awaitTermination() }
object kafkaSessionization { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println("Usage: kafkaSessionization <bootstrap-servers> " + "<subscribe-type> <protocol> <service> <domain> <topics> <checkpointLocation>") System.exit(1) } val Array(bootstrapServers, subscribeType, protocol, service, domain,topics, checkpointLocation) = args val spark = SparkSession .builder .appName("kafkaSessionization") .getOrCreate() spark.conf.set("spark.sql.streaming.checkpointLocation", checkpointLocation) spark.streams.addListener(new StreamingQueryListener { @volatile private var startTime: Long = 0L @volatile private var endTime: Long = 0L @volatile private var numRecs: Long = 0L override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { println("Query started: " + event.id) startTime = System.currentTimeMillis } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { println("Query made progress: " + event.progress) numRecs += event.progress.numInputRows } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { println("Query terminated: " + event.id) endTime = System.currentTimeMillis } }) import spark.implicits._ val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] .map { x => val splitStr = x.split(",") (splitStr(0), Timestamp.valueOf(splitStr(1))) }.as[(String, Timestamp)].flatMap { case(line, timestamp) => line.split(" ").map(word => Event(sessionId = word, timestamp))} // Sessionize the events. Track number of events, start and end timestamps of session, and // and report session updates. val sessionUpdates = df .groupByKey(event => event.sessionId) .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) => // If timed out, then remove session and send final update if (state.hasTimedOut) { val finalUpdate = SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true) state.remove() finalUpdate } else { // Update start and end timestamps in session val timestamps = events.map(_.timestamp.getTime).toSeq val updatedSession = if (state.exists) { val oldSession = state.get SessionInfo( oldSession.numEvents + timestamps.size, oldSession.startTimestampMs, math.max(oldSession.endTimestampMs, timestamps.max)) } else { SessionInfo(timestamps.size, timestamps.min, timestamps.max) } state.update(updatedSession) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("10 seconds") SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false) } } // Start running the query that prints the session updates to the console val query = sessionUpdates .writeStream .outputMode("update") .format("console") .start() query.awaitTermination() }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.