Updated on 2022-07-11 GMT+08:00

Scala Sample Code

Function Description

Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of one second, and achieve strict consistency of status.

Sample Code

  1. Formats of sent data.
    case class SEvent(id: Long, name: String, info: String, count: Int)
  2. Snapshot data

    The snapshot data is used to store number of data pieces recorded by operators during creation of snapshots.

    //User-defined statuses. 
    
    class UDFState extends Serializable{ 
        private var count = 0L 
         
        //Configure user-defined statuses.
        def setState(s: Long) = count = s 
     
        //Obtain user-defined statuses.
        def getState = count 
    }
  3. Data source with checkpoints

    Code of the source operator. The code can be used to send 10000 pieces after each pause of one second. When a snapshot is created, number of sent data pieces is recorded in UDFState. When the snapshot is used for restoration, the number of sent data pieces recorded in UDFState is read and assigned to the count variable.

    import java.util 
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed 
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction 
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext 
     
    //The class is the source operator with checkpoint.
    class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFState]{ 
        private var count = 0L 
        private var isRunning = true 
        private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" 
     
        //The logic of the operator is to inject 10000 tuples to the StreamGraph.
        override def run(sourceContext: SourceContext[SEvent]): Unit = { 
            while(isRunning) { 
                for (i <- 0 until 10000) { 
                    sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) 
                    count += 1L 
                 } 
                 Thread.sleep(1000) 
            } 
        } 
     
        //Call this when the task is canceled.
        override def cancel(): Unit = { 
            isRunning = false; 
        } 
     
         override def close(): Unit = super.close() 
     
         //Create a snapshot.
         override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { 
             val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] 
           val udfState = new UDFState 
             udfState.setState(count) 
             udfList.add(udfState) 
             udfList 
         } 
     
         //Obtain status from the snapshot.
         override def restoreState(list: util.List[UDFState]): Unit = { 
             val udfState = list.get(0) 
             count = udfState.getState 
         } 
    }
  4. Definition of window with checkpoint.

    This code is about the window operator and is used to calculate number or tuples in the window.

    import java.util 
    import org.apache.flink.api.java.tuple.Tuple 
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed 
    import org.apache.flink.streaming.api.scala.function.WindowFunction 
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
    import org.apache.flink.util.Collector 
     
    //The class is the window operator with checkpoint.
    class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ 
        private var total = 0L 
         
        //The implementation logic of the window operator, which is used to calculate the number of tuples in a window.
        override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { 
            var count = 0L 
            for (event <- input) { 
                count += 1L 
            } 
            total += count 
            out.collect(count) 
         } 
     
         //Customize a snapshot.
         override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { 
             val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] 
             val udfState = new UDFState 
             udfState.setState(total) 
             udfList.add(udfState) 
             udfList 
         } 
     
        //Restore data from customized snapshots.
        override def restoreState(list: util.List[UDFState]): Unit = { 
            val udfState = list.get(0) 
            total = udfState.getState 
        } 
    }
  5. Application code

    The code is about the definition of StreamGraph and is used to implement services. The event time is used as the timestamp for triggering the window.

    import com.huawei.rt.flink.core.{SEvent, SEventSourceWithChk, WindowStatisticWithChk} 
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend 
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks 
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} 
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
    import org.apache.flink.streaming.api.watermark.Watermark 
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows 
    import org.apache.flink.streaming.api.windowing.time.Time 
    import org.apache.flink.api.scala._ 
    import org.apache.flink.runtime.state.filesystem.FsStateBackend 
    import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup 
     
    object FlinkEventTimeAPIChkMain { 
        def main(args: Array[String]): Unit ={ 
            val env = StreamExecutionEnvironment.getExecutionEnvironment 
            env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/")) 
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
            env.getConfig.setAutoWatermarkInterval(2000) 
            env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 
            env.getCheckpointConfig.setCheckpointInterval(6000) 
     
            //Application logic. 
            env.addSource(new SEventSourceWithChk) 
               .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { 
                   //Configure watermark. 
                    override def getCurrentWatermark: Watermark = { 
                        new Watermark(System.currentTimeMillis()) 
                    } 
                   //Add timestamp for each tuple.
                    override def extractTimestamp(t: SEvent, l: Long): Long = { 
                         System.currentTimeMillis() 
                    } 
               }) 
              .keyBy(0) 
              .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) 
              .apply(new WindowStatisticWithChk) 
              .print() 
           env.execute() 
      } 
    }