Flink异步Checkpoint Scala样例代码
代码样例
假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。
- 发送数据形式
1
case class SEvent(id: Long, name: String, info: String, count: Int)
- 快照数据
该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。
1 2 3 4 5 6 7 8 9 10
// 用户自定义状态 class UDFStateScala extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count }
- 带checkpoint的数据源
source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
import java.util import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext case class SEvent(id: Long, name: String, info: String, count: Int) // 该类是带有checkpoint的source算子 class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFStateScala]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" // source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } // 任务取消时调用 override def cancel(): Unit = { isRunning = false; } override def close(): Unit = super.close() // 制作快照 override def snapshotState(l: Long, l1: Long): util.List[UDFStateScala] = { val udfList: util.ArrayList[UDFStateScala] = new util.ArrayList[UDFStateScala] val udfState = new UDFStateScala udfState.setState(count) udfList.add(udfState) udfList } // 从快照中获取状态 override def restoreState(list: util.List[UDFStateScala]): Unit = { val udfState = list.get(0) count = udfState.getState } }
- 带checkpoint的窗口定义
该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
import java.util import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector // 该类是带checkpoint的window算子 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFStateScala]{ private var total = 0L // window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 制作自定义状态快照 override def snapshotState(l: Long, l1: Long): util.List[UDFStateScala] = { val udfList: util.ArrayList[UDFStateScala] = new util.ArrayList[UDFStateScala] val udfState = new UDFStateScala udfState.setState(total) udfList.add(udfState) udfList } // 从自定义快照中恢复状态 override def restoreState(list: util.List[UDFStateScala]): Unit = { val udfState = list.get(0) total = udfState.getState } }
- 应用代码
该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了event time。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val chkPath = ParameterTool.fromArgs(args).get("chkPath", "hdfs://hacluster/flink/checkpoint/checkpoint/") val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend(chkPath)) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(2000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) // 应用逻辑 env.addSource(new SEventSourceWithChk) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } }