Flink异步Checkpoint Java样例代码
代码样例
假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。
- 快照数据
该数据在算子制作快照时,用于保存到目前为止算子记录的数据条数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import java.io.Seriablizale; // 该类作为快照的一部分,保存用户自定义状态 public class UDFState implements Serializable { private long count; // 初始化用户自定义状态 public UDFState() { count = 0L; } // 设置用户自定义状态 public void setState(long count) { this.count = count; } // 获取用户自定义状态 public long geState() { return this.count; } }
- 带checkpoint的数据源
source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.ArrayList; import java.util.List; import java.util.Random; public class SimpleSourceWithCheckPoint implements SourceFunction<Tuple4<Long, String, String, Integer>>, ListCheckpointed<UDFState> { private long count = 0; private boolean isRunning = true; private String alphabet = "justtest"; @Override public List<UDFState> snapshotState(long l, long l1) throws Exception { UDFState udfState = new UDFState(); List<UDFState> udfStateList = new ArrayList<UDFState>(); udfState.setCount(count); udfStateList.add(udfState); return udfStateList; } @Override public void restoreState(List<UDFState> list) throws Exception { UDFState udfState = list.get(0); count = udfState.getCount(); } @Override public void run(SourceContext<Tuple4<Long, String, String, Integer>> sourceContext) throws Exception { Random random = new Random(); while (isRunning) { for (int i = 0; i < 10000; i++) { sourceContext.collect(Tuple4.of(random.nextLong(), "hello" + count, alphabet, 1)); count ++; } Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }
- 带checkpoint的窗口定义
该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; public class WindowStatisticWithChk implements WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple, TimeWindow>, ListCheckpointed<UDFState> { private long total = 0; @Override public List<UDFState> snapshotState(long l, long l1) throws Exception { UDFState udfState = new UDFState(); List<UDFState> list = new ArrayList<UDFState>(); udfState.setCount(total); list.add(udfState); return list; } @Override public void restoreState(List<UDFState> list) throws Exception { UDFState udfState = list.get(0); total = udfState.getCount(); } @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple4<Long, String, String, Integer>> iterable, Collector<Long> collector) throws Exception { long count = 0L; for (Tuple4<Long, String, String, Integer> tuple4 : iterable) { count ++; } total += count; collector.collect(total); } }
- 应用代码
该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了processing time。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkProcessingTimeAPIChkMain { public static void main(String[] args) throws Exception { String chkPath = ParameterTool.fromArgs(args).get("chkPath", "hdfs://hacluster/flink/checkpoints/"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend((StateBackend) new FsStateBackend((chkPath))); env.enableCheckpointing(6000, CheckpointingMode.EXACTLY_ONCE); env.addSource(new SimpleSourceWithCheckPoint()) .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk()) .print(); env.execute(); } }