Updated on 2024-08-10 GMT+08:00

Flink DataStream Sample Program (Java)

Function Description

Collect the information about female netizens who continuously spend more than 2 hour in online shopping and print the result.

DataStream FlinkStreamJavaExampleSample Code

The following code segment is an example. For details, see com.huawei.bigdata.flink.examples.FlinkStreamJavaExample.

    //Parameter description: 
    //<filePath> indicates paths where text is read. Paths are separated by commas (,). 
    //<windowTime> indicates the period covered by statistics window. The unit is minute. 
public class FlinkStreamJavaExample { 
    public static void main(String[] args) throws Exception { 
        //Print reference command for executing flink run. 
        System.out.println("use command as: "); 
        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); 
        System.out.println("******************************************************************************************"); 
        System.out.println("<filePath> is for text file to read data, use comma to separate"); 
        System.out.println("<windowTime> is the width of the window, time as minutes"); 
        System.out.println("******************************************************************************************"); 
 
        //Paths where text is read. Paths are separated by commas (,)
        final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); 
        assert filePaths.length > 0; 
 
        //windowTime specifies the size of the time window. By default, a window with 2 minutes as the size can read all data in a text file. 
        final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); 
 
        //Build the execution environment and run eventTime to process window data.
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
        env.setParallelism(1); 
 
        //Read DataStream of the text.
        DataStream<String> unionStream = env.readTextFile(filePaths[0]); 
        if (filePaths.length > 1) { 
            for (int i = 1; i < filePaths.length; i++) { 
                unionStream = unionStream.union(env.readTextFile(filePaths[i])); 
            } 
        } 
 
        //Convert data, build the logic for the entire data process, calculate, and print results.
        unionStream.map(new MapFunction<String, UserRecord>() { 
            @Override 
            public UserRecord map(String value) throws Exception { 
                return getRecord(value); 
            } 
        }).assignTimestampsAndWatermarks( 
                new Record2TimestampExtractor() 
        ).filter(new FilterFunction<UserRecord>() { 
            @Override 
            public boolean filter(UserRecord value) throws Exception { 
                return value.sexy.equals("female"); 
            } 
        }).keyBy( 
            new UserRecordSelector() 
        ).window( 
            TumblingEventTimeWindows.of(Time.minutes(windowTime)) 
        ).reduce(new ReduceFunction<UserRecord>() { 
            @Override 
            public UserRecord reduce(UserRecord value1, UserRecord value2) 
                    throws Exception { 
                value1.shoppingTime += value2.shoppingTime; 
                return value1; 
            } 
        }).filter(new FilterFunction<UserRecord>() { 
            @Override 
            public boolean filter(UserRecord value) throws Exception { 
                return value.shoppingTime > 120; 
            } 
        }).print(); 
 
        //Call execute to trigger the execution. 
        env.execute("FemaleInfoCollectionPrint java"); 
    } 
 
    //Build the keyword of keyBy as the grouping basis.
    private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { 
        @Override 
        public Tuple2<String, String> getKey(UserRecord value) throws Exception { 
            return Tuple2.of(value.name, value.sexy); 
        } 
    } 
 
    //Parse the row data of the text and build UserRecord data structure.  
    private static UserRecord getRecord(String line) { 
        String[] elems = line.split(","); 
        assert elems.length == 3; 
        return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); 
    } 
 
    //Defines UserRecord data structure and rewrite toString printing method. 
    public static class UserRecord { 
        private String name; 
        private String sexy; 
        private int shoppingTime; 
 
        public UserRecord(String n, String s, int t) { 
            name = n; 
            sexy = s; 
            shoppingTime = t; 
        } 
 
        public String toString() { 
            return "name: " + name + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime; 
        } 
    } 
 
    //Build class that inherits AssignerWithPunctuatedWatermarks to configure eventTime and waterMark.
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { 
 
        //add tag in the data of datastream elements 
        @Override 
        public long extractTimestamp(UserRecord element, long previousTimestamp) { 
            return System.currentTimeMillis(); 
        } 
 
        //give the watermark to trigger the window to execute, and use the value to check if the window elements are ready 
        @Override 
        public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { 
            return new Watermark(extractedTimestamp - 1); 
        } 
    } 
} 

The following is the command output:

    name: FangBo  sexy: female  shoppingTime: 320 
    name: CaiXuyu  sexy: female  shoppingTime: 300

Figure 1 shows the process of execution.

Figure 1 Process of execution