Flink开启Checkpoint样例程序(Scala)
功能介绍
假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。
代码样例
- 发送数据形式
case class SEvent(id: Long, name: String, info: String, count: Int)
- 快照数据
该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。
// 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count }
- 带checkpoint的数据源
source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。
import java.util import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext // 该类是带有checkpoint的source算子 class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFState]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" // source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } // 任务取消时调用 override def cancel(): Unit = { isRunning = false; } override def close(): Unit = super.close() // 制作快照 override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(count) udfList.add(udfState) udfList } // 从快照中获取状态 override def restoreState(list: util.List[UDFState]): Unit = { val udfState = list.get(0) count = udfState.getState } }
- 带checkpoint的窗口定义
该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。
import java.util import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector // 该类是带checkpoint的window算子 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L // window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 制作自定义状态快照 override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } // 从自定义快照中恢复状态 override def restoreState(list: util.List[UDFState]): Unit = { val udfState = list.get(0) total = udfState.getState } }
- 应用代码
该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了event time。
import com.huawei.rt.flink.core.{SEvent, SEventSourceWithChk, WindowStatisticWithChk} import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/")) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(2000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) // 应用逻辑 env.addSource(new SEventSourceWithChk) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } }