更新时间:2024-08-03 GMT+08:00

Flink DataStream Java样例代码

功能简介

统计连续网购时间超过2个小时的女性网民信息,将统计结果直接打印。

代码样例

下面代码片段仅为演示,具体代码参见com.huawei.flink.example.stream.FlinkStreamJavaExample:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
    // 参数解析:
    // <filePath>为文本读取路径,用逗号分隔。
    // <windowTime>为统计数据的窗口跨度,时间单位都是分。
public class FlinkStreamJavaExample {
    public static void main(String[] args) throws Exception {
        // 打印出执行flink run的参考命令
        System.out.println("use command as: ");
        System.out.println("./bin/flink run --class com.huawei.flink.examples.stream.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2");
        System.out.println("******************************************************************************************");
        System.out.println("<filePath> is for text file to read data, use comma to separate");
        System.out.println("<windowTime> is the width of the window, time as minutes");
        System.out.println("******************************************************************************************");
        // 读取文本路径信息,并使用逗号分隔
        final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",");
        assert filePaths.length > 0;
        // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了
        final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2);
        // 构造执行环境,使用eventTime处理窗口数据
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        // 读取文本数据流
        DataStream<String> unionStream = env.readTextFile(filePaths[0]);
        if (filePaths.length > 1) {
            for (int i = 1; i < filePaths.length; i++) {
                unionStream = unionStream.union(env.readTextFile(filePaths[i]));
            }
        }
        // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来
        unionStream.map(new MapFunction<String, UserRecord>() {
            @Override
            public UserRecord map(String value) throws Exception {
                return getRecord(value);
            }
        }).assignTimestampsAndWatermarks(
                new Record2TimestampExtractor()
        ).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.sexy.equals("female");
            }
        }).keyBy(
            new UserRecordSelector()
        ).window(
            TumblingEventTimeWindows.of(Time.minutes(windowTime))
        ).reduce(new ReduceFunction<UserRecord>() {
            @Override
            public UserRecord reduce(UserRecord value1, UserRecord value2)
                    throws Exception {
                value1.shoppingTime += value2.shoppingTime;
                return value1;
            }
        }).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.shoppingTime > 120;
            }
        }).print();
        // 调用execute触发执行
        env.execute("FemaleInfoCollectionPrint java");
    }
    // 构造keyBy的关键字作为分组依据
    private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> {
        @Override
        public Tuple2<String, String> getKey(UserRecord value) throws Exception {
            return Tuple2.of(value.name, value.sexy);
        }
    }
    // 解析文本行数据,构造UserRecord数据结构
    private static UserRecord getRecord(String line) {
        String[] elems = line.split(",");
        assert elems.length == 3;
        return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
    }
    // UserRecord数据结构的定义,并重写了toString打印方法
    public static class UserRecord {
        private String name;
        private String sexy;
        private int shoppingTime;
        public UserRecord(String n, String s, int t) {
            name = n;
            sexy = s;
            shoppingTime = t;
        }
        public String toString() {
            return "name: " + name + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
        }
    }
    // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> {
        // add tag in the data of datastream elements
        @Override
        public long extractTimestamp(UserRecord element, long previousTimestamp) {
            return System.currentTimeMillis();
        }
        // give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready
        @Override
        public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) {
            return new Watermark(extractedTimestamp - 1);
        }
    }
}

执行之后打印结果如下所示:

1
2
    name: FangBo  sexy: female  shoppingTime: 320
    name: CaiXuyu  sexy: female  shoppingTime: 300