Flink开启Checkpoint样例程序(Java)
功能介绍
假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。
代码样例
- 快照数据
该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。
import java.io.Seriablizale; // 该类作为快照的一部分,保存用户自定义状态 public class UDFState implements Serializable { private long count; // 初始化用户自定义状态 public UDFState() { count = 0L; } // 设置用户自定义状态 public void setState(long count) { this.count = count; } // 获取用户自定义状态 public long geState() { return this.count; } }
- 带checkpoint的数据源
source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。
import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.List; import java.util.Random; // 该类是带checkpoint的source算子 public class SEventSourceWithChk extends RichSourceFunction<Tuple4<Long, String, String, Integer>> implements ListCheckpointed<UDFState> { private Long count = 0L; private boolean isRunning = true; private String alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"; // 算子的主要逻辑,每秒钟向流图中注入10000个元组 public void run(SourceContext<Tuple4<Long, String, String, Integer>> ctx) throws Exception { Random random = new Random(); while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple4.of(random.nextLong(), "hello-" + count, alphabet, 1)) count++; } Thread.sleep(1000); } } // 任务取消时调用 public void cancel() { isRunning = false; } // 制作自定义快照 public List<UDFState> snapshotState(long l, long ll) throws Exception { UDFState udfState = new UDFState(); List<UDFState> listState = new ArrayList<UDFState>(); udfState.setState(count); listState.add(udfState); return listState; } // 从自定义快照中恢复数据 public void restoreState(List<UDFState> list) throws Exception { UDFState udfState = list.get(0); count = udfState.getState(); } }
- 带checkpoint的窗口定义
该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; // 该类是带checkpoint的window算子 public class WindowStatisticWithChk implements WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple, TimeWindow>, ListCheckpointed<UDFState> { private Long total = 0L; // window算子实现逻辑,统计window中元组的个数 void apply(Tuple key, TimeWindow window, Iterable<Tuple4<Long, String, String, Integer>> input, Collector<Long> out) throws Exception { long count = 0L; for (Tuple4<Long, String, String, Integer> event : input) { count++; } total += count; out.collect(count); } // 制作自定义快照 public List<UDFState> snapshotState(Long l, Long ll) { List<UDFState> listState = new ArrayList<UDFState>(); UDFState udfState = new UDFState(); udfState.setState(total); listState.add(udfState); return listState; } // 从自定义快照中恢复状态 public void restoreState(List<UDFState> list) throws Exception { UDFState udfState = list.get(0); total = udfState.getState(); } }
- 应用代码
该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了processing time。
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkProcessingTimeAPIChkMain { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置相关配置,并开启checkpoint功能 env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/")); env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig.setCheckpointInterval(6000); // 应用逻辑 env.addSource(new SEventSourceWithChk()) .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk()) .print() env.execute(); } }