更新时间:2024-08-03 GMT+08:00

Flink开启Checkpoint样例程序(Scala)

功能介绍

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。

代码样例

  1. 发送数据形式
    case class SEvent(id: Long, name: String, info: String, count: Int)
  2. 快照数据

    该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。

    // 用户自定义状态
    class UDFState extends Serializable{
        private var count = 0L
        
        // 设置用户自定义状态
        def setState(s: Long) = count = s
    
        // 获取用户自定状态
        def getState = count
    }
  3. 带checkpoint的数据源

    source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。

    import java.util
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    
    // 该类是带有checkpoint的source算子
    class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFState]{
        private var count = 0L
        private var isRunning = true
        private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
    
        // source算子的逻辑,即:每秒钟向流图中注入10000个元组
        override def run(sourceContext: SourceContext[SEvent]): Unit = {
            while(isRunning) {
                for (i <- 0 until 10000) {
                    sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
                    count += 1L
                 }
                 Thread.sleep(1000)
            }
        }
    
        // 任务取消时调用
        override def cancel(): Unit = {
            isRunning = false;
        }
    
         override def close(): Unit = super.close()
    
         // 制作快照
         override def snapshotState(l: Long, l1: Long): util.List[UDFState] = {
             val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]
             val udfState = new UDFState
             udfState.setState(count)
             udfList.add(udfState)
             udfList
         }
    
         // 从快照中获取状态
         override def restoreState(list: util.List[UDFState]): Unit = {
             val udfState = list.get(0)
             count = udfState.getState
         }
    }
  4. 带checkpoint的窗口定义

    该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。

    import java.util
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    // 该类是带checkpoint的window算子
    class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{
        private var total = 0L
        
        // window算子的实现逻辑,即:统计window中元组的数量
        override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
            var count = 0L
            for (event <- input) {
                count += 1L
            }
            total += count
            out.collect(count)
         }
    
         // 制作自定义状态快照
         override def snapshotState(l: Long, l1: Long): util.List[UDFState] = {
             val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]
             val udfState = new UDFState
             udfState.setState(total)
             udfList.add(udfState)
             udfList
         }
    
        // 从自定义快照中恢复状态
        override def restoreState(list: util.List[UDFState]): Unit = {
            val udfState = list.get(0)
            total = udfState.getState
        }
    }
  5. 应用代码

    该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了event time。

    import com.huawei.rt.flink.core.{SEvent, SEventSourceWithChk, WindowStatisticWithChk}
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.api.scala._
    import org.apache.flink.runtime.state.filesystem.FsStateBackend
    import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
    
    object FlinkEventTimeAPIChkMain {
        def main(args: Array[String]): Unit ={
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/"))
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
            env.getConfig.setAutoWatermarkInterval(2000)
            env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
            env.getCheckpointConfig.setCheckpointInterval(6000)
    
            // 应用逻辑
            env.addSource(new SEventSourceWithChk)
               .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
                   // 设置watermark
                    override def getCurrentWatermark: Watermark = {
                        new Watermark(System.currentTimeMillis())
                    }
                   // 给每个元组打上时间戳
                    override def extractTimestamp(t: SEvent, l: Long): Long = {
                         System.currentTimeMillis()
                    }
               })
              .keyBy(0)
              .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
              .apply(new WindowStatisticWithChk)
              .print()
           env.execute()
        }
    }