Updated on 2023-04-10 GMT+08:00

Java Sample Code

Function Description

Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of one second, and achieve strict consistency of status.

Sample Code

  1. Snapshot data

    The snapshot data is used to store number of data pieces recorded by operators during creation of snapshots.

    import java.io.Seriablizale; 
     
    //The class is a part of the snapshot and is used to store user-defined statuses. 
    public class UDFState implements Serializable { 
        private long count; 
         
        //Initialize user-defined statuses. 
        public UDFState() { 
            count = 0L; 
        } 
     
        //Configure self-defined statuses. 
        public void setState(long count) { 
           this.count = count; 
        } 
     
        //Obtain user-defined statuses. 
        public long geState() { 
            return this.count; 
        } 
    }
  2. Data source with checkpoints

    Code of the source operator. The code can be used to send 10000 pieces after each pause of one second. When a snapshot is created, number of sent data pieces is recorded in UDFState. When the snapshot is used for restoration, the number of sent data pieces recorded in UDFState is read and assigned to the count variable.

    import org.apache.flink.api.java.tuple.Tuple4; 
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; 
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction; 
     
    import java.util.ArrayList; 
    import java.util.List; 
    import java.util.Random; 
     
    //The class is the source operator with checkpoint.
    public class SEventSourceWithChk extends RichSourceFunction<Tuple4<Long, String, String, Integer>> implements ListCheckpointed<UDFState> { 
        private Long count = 0L; 
        private boolean isRunning = true; 
        private String alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"; 
     
        //The main logic of the operator is to inject 10000 tuples to the StreamGraph.
        public void run(SourceContext<Tuple4<Long, String, String, Integer>> ctx) throws Exception { 
            Random random = new Random(); 
            while(isRunning) { 
                for (int i = 0; i < 10000; i++) { 
                    ctx.collect(Tuple4.of(random.nextLong(), "hello-" + count, alphabet, 1)) 
                    count++; 
                } 
                Thread.sleep(1000); 
            } 
        } 
     
        //Call this when the task is canceled.
        public void cancel() { 
            isRunning = false; 
        } 
     
        //Customize a snapshot.
        public List<UDFState> snapshotState(long l, long ll) throws Exception { 
            UDFState udfState = new UDFState(); 
            List<UDFState> listState = new ArrayList<UDFState>(); 
            udfState.setState(count); 
            listState.add(udfState); 
            return listState; 
        } 
     
        //Restore data from customized snapshots.
        public void restoreState(List<UDFState> list) throws Exception { 
            UDFState udfState = list.get(0); 
            count = udfState.getState(); 
        } 
    }
  3. Definition of window with checkpoint.

    This code is about the window operator and is used to calculate number or tuples in the window.

    import org.apache.flink.api.java.tuple.Tuple; 
    import org.apache.flink.api.java.tuple.Tuple4; 
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; 
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction; 
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow; 
    import org.apache.flink.util.Collector; 
     
    import java.util.ArrayList; 
    import java.util.List; 
     
    //The class is the window operator with checkpoint.
    public class WindowStatisticWithChk implements WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple, TimeWindow>, ListCheckpointed<UDFState> { 
        private Long total = 0L; 
         
        //The implementation logic of the window operator, which is used to calculate the number of tuples in a window.
        void apply(Tuple key, TimeWindow window, Iterable<Tuple4<Long, String, String, Integer>> input,  
                   Collector<Long> out) throws Exception { 
            long count = 0L; 
            for (Tuple4<Long, String, Stirng, Integer> event : input) { 
                count++;
            } 
            total += count; 
            out.collect(count); 
        } 
     
        //Customize snapshot.
        public List<UDFState> snapshotState(Long l, Long ll) { 
            List<UDFState> listState = new ArrayList<UDFState>(); 
            UDFState udfState = new UDFState(); 
            udfState.setState(total); 
            listState.add(udfState); 
            return listState; 
         } 
     
         //Restore data from customized snapshots.
         public void restoreState(List<UDFState> list) throws Exception { 
             UDFState udfState = list.get(0);  
             total = udfState.getState(); 
         } 
    }
  4. Application code

    The code is about the definition of StreamGraph and is used to implement services. The processing time is used as the timestamp for triggering the window.

    import org.apache.flink.runtime.state.filesystem.FsStateBackend; 
    import org.apache.flink.streaming.api.CheckpointingMode; 
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; 
    import org.apache.flink.streaming.api.windowing.time.Time; 
     
    public class FlinkProcessingTimeAPIChkMain { 
        public static void main(String[] args) throws Exception{ 
     
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     
            //Set configurations and enable checkpoint.
            env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/")); 
            env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
            env.getCheckpointConfig.setCheckpointInterval(6000); 
             
            //Application logic.
            env.addSource(new SEventSourceWithChk()) 
               .keyBy(0) 
             .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1))) 
               .apply(new WindowStatisticWithChk()) 
               .print() 
     
            env.execute(); 
        } 
    }