Scala Sample Code
Function Description
Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of one second, and achieve strict consistency of status.
Sample Code
- Formats of sent data.
case class SEvent(id: Long, name: String, info: String, count: Int)
- Snapshot data
The snapshot data is used to store number of data pieces recorded by operators during creation of snapshots.
//User-defined statuses. class UDFState extends Serializable{ private var count = 0L //Configure user-defined statuses. def setState(s: Long) = count = s //Obtain user-defined statuses. def getState = count }
- Data source with checkpoints
Code of the source operator. The code can be used to send 10000 pieces after each pause of one second. When a snapshot is created, number of sent data pieces is recorded in UDFState. When the snapshot is used for restoration, the number of sent data pieces recorded in UDFState is read and assigned to the count variable.
import java.util import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext //The class is the source operator with checkpoint. class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFState]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" //The logic of the operator is to inject 10000 tuples to the StreamGraph. override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } //Call this when the task is canceled. override def cancel(): Unit = { isRunning = false; } override def close(): Unit = super.close() //Create a snapshot. override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(count) udfList.add(udfState) udfList } //Obtain status from the snapshot. override def restoreState(list: util.List[UDFState]): Unit = { val udfState = list.get(0) count = udfState.getState } }
- Definition of window with checkpoint.
This code is about the window operator and is used to calculate number or tuples in the window.
import java.util import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector //The class is the window operator with checkpoint. class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L //The implementation logic of the window operator, which is used to calculate the number of tuples in a window. override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } //Customize a snapshot. override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } //Restore data from customized snapshots. override def restoreState(list: util.List[UDFState]): Unit = { val udfState = list.get(0) total = udfState.getState } }
- Application code
The code is about the definition of StreamGraph and is used to implement services. The event time is used as the timestamp for triggering the window.
import com.huawei.rt.flink.core.{SEvent, SEventSourceWithChk, WindowStatisticWithChk} import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/")) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(2000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) //Application logic. env.addSource(new SEventSourceWithChk) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { //Configure watermark. override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } //Add timestamp for each tuple. override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot