Updated on 2024-08-16 GMT+08:00

Flink DataStream Java Sample Code

Function Description

Collect statistics on female netizens who continuously dwell on online shopping for more than 2 hours and print statistics directly.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.flink.example.stream.FlinkStreamJavaExample.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
    // Parameter description:
    // <filePath> is the path for reading text files. The paths are separated by commas (,).
    // <windowTime> is the time span of the statistics data. The unit is minute.
public class FlinkStreamJavaExample {
    public static void main(String[] args) throws Exception {
       //Print the command reference for flink run. 
        System.out.println("use command as: ");
        System.out.println("./bin/flink run --class com.huawei.flink.examples.stream.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2");
        System.out.println("******************************************************************************************");
        System.out.println("<filePath> is for text file to read data, use comma to separate");
        System.out.println("<windowTime> is the width of the window, time as minutes");
        System.out.println("******************************************************************************************");
       // Read text path information. The paths are separated by commas (,).
        final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",");
        assert filePaths.length > 0;
       // windowTime is used to set the time window. The default value is 2 minutes per time window. One time window is sufficient to read all data in the text.
        final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2);
       // Construct an execution environment and use eventTime to process the data obtained in a time window.
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
       // Read the text data stream.
        DataStream<String> unionStream = env.readTextFile(filePaths[0]);
        if (filePaths.length > 1) {
            for (int i = 1; i < filePaths.length; i++) {
                unionStream = unionStream.union(env.readTextFile(filePaths[i]));
            }
        }
       // Convert the data, construct the entire data processing logic, and calculate and print the results.
        unionStream.map(new MapFunction<String, UserRecord>() {
            @Override
            public UserRecord map(String value) throws Exception {
                return getRecord(value);
            }
        }).assignTimestampsAndWatermarks(
                new Record2TimestampExtractor()
        ).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.sexy.equals("female");
            }
        }).keyBy(
            new UserRecordSelector()
        ).window(
            TumblingEventTimeWindows.of(Time.minutes(windowTime))
        ).reduce(new ReduceFunction<UserRecord>() {
            @Override
            public UserRecord reduce(UserRecord value1, UserRecord value2)
                    throws Exception {
                value1.shoppingTime += value2.shoppingTime;
                return value1;
            }
        }).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.shoppingTime > 120;
            }
        }).print();
       // Invoke execute to trigger the execution. 
        env.execute("FemaleInfoCollectionPrint java");
    }
    // Construct a keyBy keyword as the grouping basis.
    private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> {
        @Override
        public Tuple2<String, String> getKey(UserRecord value) throws Exception {
            return Tuple2.of(value.name, value.sexy);
        }
    }
    // Resolve the text line data and construct the UserRecord data structure.
    private static UserRecord getRecord(String line) {
        String[] elems = line.split(",");
        assert elems.length == 3;
        return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
    }
    // Define the UserRecord data structure and rewrite the toString printing method.
    public static class UserRecord {
        private String name;
        private String sexy;
        private int shoppingTime;
        public UserRecord(String n, String s, int t) {
            name = n;
            sexy = s;
            shoppingTime = t;
        }
        public String toString() {
            return "name: " + name + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
        }
    }
    // Construct a class inherited from AssignerWithPunctuatedWatermarks to set eventTime and waterMark.
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> {
        // add tag in the data of datastream elements
        @Override
        public long extractTimestamp(UserRecord element, long previousTimestamp) {
            return System.currentTimeMillis();
        }
        // give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready
        @Override
        public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) {
            return new Watermark(extractedTimestamp - 1);
        }
    }
}

The following is the command output:

1
2
    name: FangBo  sexy: female  shoppingTime: 320
    name: CaiXuyu  sexy: female  shoppingTime: 300