更新时间:2024-08-03 GMT+08:00

Flink异步Checkpoint Java样例代码

代码样例

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。

  • 快照数据

    该数据在算子制作快照时,用于保存到目前为止算子记录的数据条数。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    import java.io.Seriablizale;
    // 该类作为快照的一部分,保存用户自定义状态
    public class UDFState implements Serializable {
        private long count;
        // 初始化用户自定义状态
        public UDFState() {
            count = 0L;
        }
        // 设置用户自定义状态
        public void setState(long count) {
           this.count = count;
        }
        // 获取用户自定义状态
        public long geState() {
            return this.count;
        }
    }
    
  • 带checkpoint的数据源

    source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    public class SimpleSourceWithCheckPoint implements SourceFunction<Tuple4<Long, String, String, Integer>>, ListCheckpointed<UDFState> {
    
        private long count = 0;
        private boolean isRunning = true;
        private String alphabet = "justtest";
    
        @Override
        public List<UDFState> snapshotState(long l, long l1) throws Exception
        {
            UDFState udfState = new UDFState();
            List<UDFState> udfStateList = new ArrayList<UDFState>();
            udfState.setCount(count);
            udfStateList.add(udfState);
            return udfStateList;
        }
    
        @Override
        public void restoreState(List<UDFState> list) throws Exception
        {
            UDFState udfState = list.get(0);
            count = udfState.getCount();
        }
    
        @Override
        public void run(SourceContext<Tuple4<Long, String, String, Integer>> sourceContext) throws Exception
        {
            Random random = new Random();
            while (isRunning) {
                for (int i = 0; i < 10000; i++) {
                    sourceContext.collect(Tuple4.of(random.nextLong(), "hello" + count, alphabet, 1));
                    count ++;
                }
                Thread.sleep(1000);
            }
        }
    
        @Override
        public void cancel()
        {
            isRunning = false;
        }
    }
    
  • 带checkpoint的窗口定义

    该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class WindowStatisticWithChk implements WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple, TimeWindow>, ListCheckpointed<UDFState> {
    
        private long total = 0;
    
        @Override
        public List<UDFState> snapshotState(long l, long l1) throws Exception
        {
            UDFState udfState = new UDFState();
            List<UDFState> list = new ArrayList<UDFState>();
            udfState.setCount(total);
            list.add(udfState);
            return list;
        }
    
        @Override
        public void restoreState(List<UDFState> list) throws Exception
        {
            UDFState udfState = list.get(0);
            total = udfState.getCount();
        }
    
        @Override
        public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple4<Long, String, String, Integer>> iterable, Collector<Long> collector) throws Exception
        {
            long count = 0L;
            for (Tuple4<Long, String, String, Integer> tuple4 : iterable) {
                count ++;
            }
            total += count;
            collector.collect(total);
        }
    }
    
  • 应用代码

    该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了processing time。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.runtime.state.StateBackend;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    public class FlinkProcessingTimeAPIChkMain {
    
        public static void main(String[] args) throws Exception
        {
            String chkPath = ParameterTool.fromArgs(args).get("chkPath", "hdfs://hacluster/flink/checkpoints/");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setStateBackend((StateBackend) new FsStateBackend((chkPath)));
            env.enableCheckpointing(6000, CheckpointingMode.EXACTLY_ONCE);
            env.addSource(new SimpleSourceWithCheckPoint())
                    .keyBy(0)
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1)))
                    .apply(new WindowStatisticWithChk())
                    .print();
    
            env.execute();
        }
    }