更新时间:2023-11-23 GMT+08:00
Java样例代码
功能简介
统计连续网购时间超过2个小时的女性网民信息,将统计结果直接打印。
代码样例
下面代码片段仅为演示,具体代码参见com.huawei.flink.example.stream.FlinkStreamJavaExample:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
// 参数解析:
// <filePath>为文本读取路径,用逗号分隔。
// <windowTime>为统计数据的窗口跨度,时间单位都是分。
public class FlinkStreamJavaExample {
public static void main(String[] args) throws Exception {
// 打印出执行flink run的参考命令
System.out.println("use command as: ");
System.out.println("./bin/flink run --class com.huawei.flink.examples.stream.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2");
System.out.println("******************************************************************************************");
System.out.println("<filePath> is for text file to read data, use comma to separate");
System.out.println("<windowTime> is the width of the window, time as minutes");
System.out.println("******************************************************************************************");
// 读取文本路径信息,并使用逗号分隔
final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",");
assert filePaths.length > 0;
// windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了
final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2);
// 构造执行环境,使用eventTime处理窗口数据
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 读取文本数据流
DataStream<String> unionStream = env.readTextFile(filePaths[0]);
if (filePaths.length > 1) {
for (int i = 1; i < filePaths.length; i++) {
unionStream = unionStream.union(env.readTextFile(filePaths[i]));
}
}
// 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来
unionStream.map(new MapFunction<String, UserRecord>() {
@Override
public UserRecord map(String value) throws Exception {
return getRecord(value);
}
}).assignTimestampsAndWatermarks(
new Record2TimestampExtractor()
).filter(new FilterFunction<UserRecord>() {
@Override
public boolean filter(UserRecord value) throws Exception {
return value.sexy.equals("female");
}
}).keyBy(
new UserRecordSelector()
).window(
TumblingEventTimeWindows.of(Time.minutes(windowTime))
).reduce(new ReduceFunction<UserRecord>() {
@Override
public UserRecord reduce(UserRecord value1, UserRecord value2)
throws Exception {
value1.shoppingTime += value2.shoppingTime;
return value1;
}
}).filter(new FilterFunction<UserRecord>() {
@Override
public boolean filter(UserRecord value) throws Exception {
return value.shoppingTime > 120;
}
}).print();
// 调用execute触发执行
env.execute("FemaleInfoCollectionPrint java");
}
// 构造keyBy的关键字作为分组依据
private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> {
@Override
public Tuple2<String, String> getKey(UserRecord value) throws Exception {
return Tuple2.of(value.name, value.sexy);
}
}
// 解析文本行数据,构造UserRecord数据结构
private static UserRecord getRecord(String line) {
String[] elems = line.split(",");
assert elems.length == 3;
return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
}
// UserRecord数据结构的定义,并重写了toString打印方法
public static class UserRecord {
private String name;
private String sexy;
private int shoppingTime;
public UserRecord(String n, String s, int t) {
name = n;
sexy = s;
shoppingTime = t;
}
public String toString() {
return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime;
}
}
// 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark
private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> {
// add tag in the data of datastream elements
@Override
public long extractTimestamp(UserRecord element, long previousTimestamp) {
return System.currentTimeMillis();
}
// give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready
@Override
public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1);
}
}
}
|
执行之后打印结果如下所示:
1 2 |
name: FangBo sexy: female shoppingTime: 320
name: CaiXuyu sexy: female shoppingTime: 300
|
父主题: DataStream程序