文档首页 > > 开发指南> Flink应用开发> 开发程序> DataStream程序> Java样例代码

Java样例代码

分享
更新时间: 2020/01/20 GMT+08:00

功能简介

统计连续网购时间超过2个小时的女性网民信息,将统计结果直接打印。

代码样例

下面代码片段仅为演示,具体代码参见com.huawei.flink.example.stream.FlinkStreamJavaExample:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
    // 参数解析:
    // <filePath>为文本读取路径,用逗号分隔。
    // <windowTime>为统计数据的窗口跨度,时间单位都是分。
public class FlinkStreamJavaExample {
    public static void main(String[] args) throws Exception {
        // 打印出执行flink run的参考命令
        System.out.println("use command as: ");
        System.out.println("./bin/flink run --class com.huawei.flink.examples.stream.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2");
        System.out.println("******************************************************************************************");
        System.out.println("<filePath> is for text file to read data, use comma to separate");
        System.out.println("<windowTime> is the width of the window, time as minutes");
        System.out.println("******************************************************************************************");
        // 读取文本路径信息,并使用逗号分隔
        final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",");
        assert filePaths.length > 0;
        // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了
        final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2);
        // 构造执行环境,使用eventTime处理窗口数据
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        // 读取文本数据流
        DataStream<String> unionStream = env.readTextFile(filePaths[0]);
        if (filePaths.length > 1) {
            for (int i = 1; i < filePaths.length; i++) {
                unionStream = unionStream.union(env.readTextFile(filePaths[i]));
            }
        }
        // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来
        unionStream.map(new MapFunction<String, UserRecord>() {
            @Override
            public UserRecord map(String value) throws Exception {
                return getRecord(value);
            }
        }).assignTimestampsAndWatermarks(
                new Record2TimestampExtractor()
        ).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.sexy.equals("female");
            }
        }).keyBy(
            new UserRecordSelector()
        ).window(
            TumblingEventTimeWindows.of(Time.minutes(windowTime))
        ).reduce(new ReduceFunction<UserRecord>() {
            @Override
            public UserRecord reduce(UserRecord value1, UserRecord value2)
                    throws Exception {
                value1.shoppingTime += value2.shoppingTime;
                return value1;
            }
        }).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.shoppingTime > 120;
            }
        }).print();
        // 调用execute触发执行
        env.execute("FemaleInfoCollectionPrint java");
    }
    // 构造keyBy的关键字作为分组依据
    private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> {
        @Override
        public Tuple2<String, String> getKey(UserRecord value) throws Exception {
            return Tuple2.of(value.name, value.sexy);
        }
    }
    // 解析文本行数据,构造UserRecord数据结构
    private static UserRecord getRecord(String line) {
        String[] elems = line.split(",");
        assert elems.length == 3;
        return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
    }
    // UserRecord数据结构的定义,并重写了toString打印方法
    public static class UserRecord {
        private String name;
        private String sexy;
        private int shoppingTime;
        public UserRecord(String n, String s, int t) {
            name = n;
            sexy = s;
            shoppingTime = t;
        }
        public String toString() {
            return "name: " + name + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
        }
    }
    // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> {
        // add tag in the data of datastream elements
        @Override
        public long extractTimestamp(UserRecord element, long previousTimestamp) {
            return System.currentTimeMillis();
        }
        // give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready
        @Override
        public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) {
            return new Watermark(extractedTimestamp - 1);
        }
    }
}
public class FlinkStreamJavaExample {

    private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamJavaExample.class);

    public static void main(String[] args) throws  Exception
    {
        System.out.println("use command as: ");
        System.out.println("./bin/flink run --class com.huawei.flink.example.stream.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2");
        System.out.println("******************************************************************************************");
        System.out.println("<filePath> is for text file to read data, use comma to separate");
        System.out.println("<windowTime> is the width of the window, time as minutes");
        System.out.println("******************************************************************************************");

        // input test file
        final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",");

        // windows time
        final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 120000);

        final int shoppingTimeTh = ParameterTool.fromArgs(args).getInt("shoppingTime", 120);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> text = env.readTextFile(filePaths[0]);
        for (String filePath : filePaths) {
            if (filePath.equals(filePaths[0])) {
                continue;
            }
            text = text.union(env.readTextFile(filePath));
        }

        text.map(new MapFunction<String, UserRecord>() {
                    @Override
                    public UserRecord map(String s) throws Exception
                    {
                        return getRecord(s);
                    }
                })
                .assignTimestampsAndWatermarks(new Record2TimestampExtractor())
                .filter(new FilterFunction<UserRecord>() {
                    @Override
                    public boolean filter(UserRecord userRecord) throws Exception
                    {
                        LOG.info("the first filter input is: " + userRecord);
                        return userRecord.sexy.equals("female");
                    }
                })
                .keyBy(new UserRecordSelector())
                .window(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
                .reduce(new ReduceFunction<UserRecord>() {
                    @Override
                    public UserRecord reduce(UserRecord userRecord1, UserRecord userRecord2) throws Exception
                    {
                        userRecord1.shoppingTime += userRecord2.shoppingTime;
                        LOG.info("after reduce output is: " + userRecord1);
                        return userRecord1;
                    }
                })
                .filter(new FilterFunction<UserRecord>() {
                    @Override
                    public boolean filter(UserRecord userRecord) throws Exception
                    {
                        LOG.info("the Last filter input is: " + userRecord);
                        return userRecord.shoppingTime > shoppingTimeTh;
                    }
                })
                .print();

        env.execute("FemaleInfoCollectionPrint java");
    }

    private static UserRecord getRecord(String line) {
        String[] elems = line.split(",");
        assert elems.length == 3;
        UserRecord userRecord = new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
        LOG.info("userRecord is: " + userRecord);
        return userRecord;
    }

    // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> {

        // add tag in the data of datastream elements
        @Override
        public long extractTimestamp(UserRecord element, long previousTimestamp) {
            return System.currentTimeMillis();
        }

        // give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready
        @Override
        public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) {
            LOG.info("element " + element + ", extractedTimestamp is: " + extractedTimestamp);
            LOG.info("element " + element + ", extractedTimestamp - 1 is: " + (extractedTimestamp - 1));
            return new Watermark(extractedTimestamp - 1);
        }
    }

    // 构造keyBy的关键字作为分组依据
    private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> {
        @Override
        public Tuple2<String, String> getKey(UserRecord value) throws Exception {
            LOG.info("the second filter input is: " + value);
            return Tuple2.of(value.name, value.sexy);
        }
    }

    // UserRecord数据结构的定义,并重写了toString打印方法
    public static class UserRecord {
        private String name;
        private String sexy;
        private int shoppingTime;

        public UserRecord(String n, String s, int t) {
            name = n;
            sexy = s;
            shoppingTime = t;
        }

        public String toString() {
            return "name: " + name + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
        }
    }
}

执行之后打印结果如下所示:

1
2
    name: FangBo  sexy: female  shoppingTime: 320
    name: CaiXuyu  sexy: female  shoppingTime: 300
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问