Java Sample Code
Function Description
Collect the information about female netizens who continuously spend more than 2 hour in online shopping and print the result.
DataStream FlinkStreamJavaExampleSample Code
The following code segment is an example. For details, see com.huawei.bigdata.flink.examples.FlinkStreamJavaExample in the FlinkStreamJavaExample project.
//Parameter description:
//<filePath> indicates paths where text is read. Paths are separated by commas (,).
//<windowTime> indicates the period covered by statistics window. The unit is minute.
public class FlinkStreamJavaExample {
public static void main(String[] args) throws Exception {
//Print reference command for executing flink run.
System.out.println("use command as: ");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2");
System.out.println("******************************************************************************************");
System.out.println("<filePath> is for text file to read data, use comma to separate");
System.out.println("<windowTime> is the width of the window, time as minutes");
System.out.println("******************************************************************************************");
//Paths where text is read. Paths are separated by commas (,)
final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",");
assert filePaths.length > 0;
//windowTime specifies the size of the time window. By default, a window with 2 minutes as the size can read all data in a text file.
final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2);
//Build the execution environment and run eventTime to process window data.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
//Read DataStream of the text.
DataStream<String> unionStream = env.readTextFile(filePaths[0]);
if (filePaths.length > 1) {
for (int i = 1; i < filePaths.length; i++) {
unionStream = unionStream.union(env.readTextFile(filePaths[i]));
}
}
//Convert data, build the logic for the entire data process, calculate, and print results.
unionStream.map(new MapFunction<String, UserRecord>() {
@Override
public UserRecord map(String value) throws Exception {
return getRecord(value);
}
}).assignTimestampsAndWatermarks(
new Record2TimestampExtractor()
).filter(new FilterFunction<UserRecord>() {
@Override
public boolean filter(UserRecord value) throws Exception {
return value.sexy.equals("female");
}
}).keyBy(
new UserRecordSelector()
).window(
TumblingEventTimeWindows.of(Time.minutes(windowTime))
).reduce(new ReduceFunction<UserRecord>() {
@Override
public UserRecord reduce(UserRecord value1, UserRecord value2)
throws Exception {
value1.shoppingTime += value2.shoppingTime;
return value1;
}
}).filter(new FilterFunction<UserRecord>() {
@Override
public boolean filter(UserRecord value) throws Exception {
return value.shoppingTime > 120;
}
}).print();
//Call execute to trigger the execution.
env.execute("FemaleInfoCollectionPrint java");
}
//Build the keyword of keyBy as the grouping basis.
private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> {
@Override
public Tuple2<String, String> getKey(UserRecord value) throws Exception {
return Tuple2.of(value.name, value.sexy);
}
}
//Parse the row data of the text and build UserRecord data structure.
private static UserRecord getRecord(String line) {
String[] elems = line.split(",");
assert elems.length == 3;
return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
}
//Defines UserRecord data structure and rewrite toString printing method.
public static class UserRecord {
private String name;
private String sexy;
private int shoppingTime;
public UserRecord(String n, String s, int t) {
name = n;
sexy = s;
shoppingTime = t;
}
public String toString() {
return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime;
}
}
//Build class that inherits AssignerWithPunctuatedWatermarks to configure eventTime and waterMark.
private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> {
//add tag in the data of datastream elements
@Override
public long extractTimestamp(UserRecord element, long previousTimestamp) {
return System.currentTimeMillis();
}
//give the watermark to trigger the window to execute, and use the value to check if the window elements are ready
@Override
public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1);
}
}
}
The following is the command output:
name: FangBo sexy: female shoppingTime: 320
name: CaiXuyu sexy: female shoppingTime: 300
Figure 1 shows the process of execution.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.
