Flink DataStream Java Sample Code
Function Description
Collect statistics on female netizens who continuously dwell on online shopping for more than 2 hours and print statistics directly.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.flink.example.stream.FlinkStreamJavaExample.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
// Parameter description: // <filePath> is the path for reading text files. The paths are separated by commas (,). // <windowTime> is the time span of the statistics data. The unit is minute. public class FlinkStreamJavaExample { public static void main(String[] args) throws Exception { //Print the command reference for flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.flink.examples.stream.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); System.out.println("******************************************************************************************"); System.out.println("<filePath> is for text file to read data, use comma to separate"); System.out.println("<windowTime> is the width of the window, time as minutes"); System.out.println("******************************************************************************************"); // Read text path information. The paths are separated by commas (,). final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); assert filePaths.length > 0; // windowTime is used to set the time window. The default value is 2 minutes per time window. One time window is sufficient to read all data in the text. final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); // Construct an execution environment and use eventTime to process the data obtained in a time window. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // Read the text data stream. DataStream<String> unionStream = env.readTextFile(filePaths[0]); if (filePaths.length > 1) { for (int i = 1; i < filePaths.length; i++) { unionStream = unionStream.union(env.readTextFile(filePaths[i])); } } // Convert the data, construct the entire data processing logic, and calculate and print the results. unionStream.map(new MapFunction<String, UserRecord>() { @Override public UserRecord map(String value) throws Exception { return getRecord(value); } }).assignTimestampsAndWatermarks( new Record2TimestampExtractor() ).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.sexy.equals("female"); } }).keyBy( new UserRecordSelector() ).window( TumblingEventTimeWindows.of(Time.minutes(windowTime)) ).reduce(new ReduceFunction<UserRecord>() { @Override public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception { value1.shoppingTime += value2.shoppingTime; return value1; } }).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.shoppingTime > 120; } }).print(); // Invoke execute to trigger the execution. env.execute("FemaleInfoCollectionPrint java"); } // Construct a keyBy keyword as the grouping basis. private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { @Override public Tuple2<String, String> getKey(UserRecord value) throws Exception { return Tuple2.of(value.name, value.sexy); } } // Resolve the text line data and construct the UserRecord data structure. private static UserRecord getRecord(String line) { String[] elems = line.split(","); assert elems.length == 3; return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); } // Define the UserRecord data structure and rewrite the toString printing method. public static class UserRecord { private String name; private String sexy; private int shoppingTime; public UserRecord(String n, String s, int t) { name = n; sexy = s; shoppingTime = t; } public String toString() { return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime; } } // Construct a class inherited from AssignerWithPunctuatedWatermarks to set eventTime and waterMark. private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { // add tag in the data of datastream elements @Override public long extractTimestamp(UserRecord element, long previousTimestamp) { return System.currentTimeMillis(); } // give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready @Override public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1); } } } |
The following is the command output:
1 2 |
name: FangBo sexy: female shoppingTime: 320 name: CaiXuyu sexy: female shoppingTime: 300 |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot