更新时间:2024-08-03 GMT+08:00

Flink异步Checkpoint Scala样例代码

代码样例

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。

  • 发送数据形式
    1
    case class SEvent(id: Long, name: String, info: String, count: Int)
    
  • 快照数据

    该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    // 用户自定义状态
    class UDFStateScala extends Serializable{
      private var count = 0L
    
      // 设置用户自定义状态
      def setState(s: Long) = count = s
    
      // 获取用户自定状态
      def getState = count
    }
    
  • 带checkpoint的数据源

    source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    import java.util
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    
    case class SEvent(id: Long, name: String, info: String, count: Int)
    
    // 该类是带有checkpoint的source算子
    class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFStateScala]{
      private var count = 0L
      private var isRunning = true
      private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
    
      // source算子的逻辑,即:每秒钟向流图中注入10000个元组
      override def run(sourceContext: SourceContext[SEvent]): Unit = {
        while(isRunning) {
          for (i <- 0 until 10000) {
            sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
            count += 1L
          }
          Thread.sleep(1000)
        }
      }
    
      // 任务取消时调用
      override def cancel(): Unit = {
        isRunning = false;
      }
    
      override def close(): Unit = super.close()
    
      // 制作快照
      override def snapshotState(l: Long, l1: Long): util.List[UDFStateScala] = {
        val udfList: util.ArrayList[UDFStateScala] = new util.ArrayList[UDFStateScala]
        val udfState = new UDFStateScala
        udfState.setState(count)
        udfList.add(udfState)
        udfList
      }
    
      // 从快照中获取状态
      override def restoreState(list: util.List[UDFStateScala]): Unit = {
        val udfState = list.get(0)
        count = udfState.getState
      }
    }
    
  • 带checkpoint的窗口定义

    该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    import java.util
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    // 该类是带checkpoint的window算子
    class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFStateScala]{
      private var total = 0L
    
      // window算子的实现逻辑,即:统计window中元组的数量
      override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
        var count = 0L
        for (event <- input) {
          count += 1L
        }
        total += count
        out.collect(count)
      }
    
      // 制作自定义状态快照
      override def snapshotState(l: Long, l1: Long): util.List[UDFStateScala] = {
        val udfList: util.ArrayList[UDFStateScala] = new util.ArrayList[UDFStateScala]
        val udfState = new UDFStateScala
        udfState.setState(total)
        udfList.add(udfState)
        udfList
      }
    
      // 从自定义快照中恢复状态
      override def restoreState(list: util.List[UDFStateScala]): Unit = {
        val udfState = list.get(0)
        total = udfState.getState
      }
    }
    
  • 应用代码

    该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了event time。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import org.apache.flink.runtime.state.filesystem.FsStateBackend
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    
    object FlinkEventTimeAPIChkMain {
      def main(args: Array[String]): Unit ={
        val chkPath = ParameterTool.fromArgs(args).get("chkPath", "hdfs://hacluster/flink/checkpoint/checkpoint/")
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStateBackend(new FsStateBackend(chkPath))
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.getConfig.setAutoWatermarkInterval(2000)
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        env.getCheckpointConfig.setCheckpointInterval(6000)
    
        // 应用逻辑
        env.addSource(new SEventSourceWithChk)
          .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
            // 设置watermark
            override def getCurrentWatermark: Watermark = {
              new Watermark(System.currentTimeMillis())
            }
            // 给每个元组打上时间戳
            override def extractTimestamp(t: SEvent, l: Long): Long = {
              System.currentTimeMillis()
            }
          })
          .keyBy(0)
          .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
          .apply(new WindowStatisticWithChk)
          .print()
        env.execute()
      }
    }