更新时间:2024-08-05 GMT+08:00

Flink开启Checkpoint样例程序(Java)

功能介绍

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。

代码样例

  1. 快照数据

    该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。

    下面代码片段仅为演示,完整代码参见FlinkCheckpointJavaExample样例工程下的com.huawei.bigdata.flink.examples.UDFState:
    import java.io.Seriablizale;
    
    // 该类作为快照的一部分,保存用户自定义状态
    public class UDFState implements Serializable {
        private long count;
        
        // 初始化用户自定义状态
        public UDFState() {
            count = 0L;
        }
    
        // 设置用户自定义状态
        public void setState(long count) {
           this.count = count;
        }
    
        // 获取用户自定义状态
        public long geState() {
            return this.count;
        }
    }
  2. 带checkpoint的数据源

    source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。

    下面代码片段仅为演示,完整代码参见FlinkCheckpointJavaExample样例工程下的com.huawei.bigdata.flink.examples.SEventSourceWithChk:

    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    // 该类是带checkpoint的source算子
    public class SEventSourceWithChk extends RichSourceFunction<Tuple4<Long, String, String, Integer>> implements ListCheckpointed<UDFState> {
        private Long count = 0L;
        private boolean isRunning = true;
        private String alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321";
    
        // 算子的主要逻辑,每秒钟向流图中注入10000个元组
        public void run(SourceContext<Tuple4<Long, String, String, Integer>> ctx) throws Exception {
            Random random = new Random();
            while(isRunning) {
                for (int i = 0; i < 10000; i++) {
                    ctx.collect(Tuple4.of(random.nextLong(), "hello-" + count, alphabet, 1))
                    count++;
                }
                Thread.sleep(1000);
            }
        }
    
        // 任务取消时调用
        public void cancel() {
            isRunning = false;
        }
    
        // 制作自定义快照 
        public List<UDFState> snapshotState(long l, long ll) throws Exception {
            UDFState udfState = new UDFState();
            List<UDFState> listState = new ArrayList<UDFState>();
            udfState.setState(count);
            listState.add(udfState);
            return listState;
        }
    
        // 从自定义快照中恢复数据
        public void restoreState(List<UDFState> list) throws Exception {
            UDFState udfState = list.get(0);
            count = udfState.getState();
        }
    }
  3. 带checkpoint的窗口定义

    该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。

    下面代码片段仅为演示,完整代码参见FlinkCheckpointJavaExample样例工程下的com.huawei.bigdata.flink.examples.WindowStatisticWithChk:

    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    
    // 该类是带checkpoint的window算子
    public class WindowStatisticWithChk implements WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple, TimeWindow>, ListCheckpointed<UDFState> {
        private Long total = 0L;
        
        // window算子实现逻辑,统计window中元组的个数
        void apply(Tuple key, TimeWindow window, Iterable<Tuple4<Long, String, String, Integer>> input, 
                   Collector<Long> out) throws Exception {
            long count = 0L;
            for (Tuple4<Long, String, String, Integer> event : input) {
                count++;
            }
            total += count;
            out.collect(count);
        }
    
        // 制作自定义快照
        public List<UDFState> snapshotState(Long l, Long ll) {
            List<UDFState> listState = new ArrayList<UDFState>();
            UDFState udfState = new UDFState();
            udfState.setState(total);
            listState.add(udfState);
            return listState;
         }
    
         // 从自定义快照中恢复状态
         public void restoreState(List<UDFState> list) throws Exception {
             UDFState udfState = list.get(0); 
             total = udfState.getState();
         }
    }
  4. 应用代码

    该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了processing time。

    下面代码片段仅为演示,完整代码参见FlinkCheckpointJavaExample样例工程下的com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain:

    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    public class FlinkProcessingTimeAPIChkMain {
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 设置相关配置,并开启checkpoint功能
            env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/"));
            env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig.setCheckpointInterval(6000);
            
            // 应用逻辑
            env.addSource(new SEventSourceWithChk())
               .keyBy(0)
               .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1)))
               .apply(new WindowStatisticWithChk())
               .print()
    
            env.execute();
        }
    }