Flink DataStream Sample Program (Java)
Function Description
Collect the information about female netizens who continuously spend more than 2 hour in online shopping and print the result.
DataStream FlinkStreamJavaExampleSample Code
The following code segment is an example. For details, see com.huawei.bigdata.flink.examples.FlinkStreamJavaExample.
//Parameter description: //<filePath> indicates paths where text is read. Paths are separated by commas (,). //<windowTime> indicates the period covered by statistics window. The unit is minute. public class FlinkStreamJavaExample { public static void main(String[] args) throws Exception { //Print reference command for executing flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); System.out.println("******************************************************************************************"); System.out.println("<filePath> is for text file to read data, use comma to separate"); System.out.println("<windowTime> is the width of the window, time as minutes"); System.out.println("******************************************************************************************"); //Paths where text is read. Paths are separated by commas (,) final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); assert filePaths.length > 0; //windowTime specifies the size of the time window. By default, a window with 2 minutes as the size can read all data in a text file. final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); //Build the execution environment and run eventTime to process window data. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); //Read DataStream of the text. DataStream<String> unionStream = env.readTextFile(filePaths[0]); if (filePaths.length > 1) { for (int i = 1; i < filePaths.length; i++) { unionStream = unionStream.union(env.readTextFile(filePaths[i])); } } //Convert data, build the logic for the entire data process, calculate, and print results. unionStream.map(new MapFunction<String, UserRecord>() { @Override public UserRecord map(String value) throws Exception { return getRecord(value); } }).assignTimestampsAndWatermarks( new Record2TimestampExtractor() ).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.sexy.equals("female"); } }).keyBy( new UserRecordSelector() ).window( TumblingEventTimeWindows.of(Time.minutes(windowTime)) ).reduce(new ReduceFunction<UserRecord>() { @Override public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception { value1.shoppingTime += value2.shoppingTime; return value1; } }).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.shoppingTime > 120; } }).print(); //Call execute to trigger the execution. env.execute("FemaleInfoCollectionPrint java"); } //Build the keyword of keyBy as the grouping basis. private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { @Override public Tuple2<String, String> getKey(UserRecord value) throws Exception { return Tuple2.of(value.name, value.sexy); } } //Parse the row data of the text and build UserRecord data structure. private static UserRecord getRecord(String line) { String[] elems = line.split(","); assert elems.length == 3; return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); } //Defines UserRecord data structure and rewrite toString printing method. public static class UserRecord { private String name; private String sexy; private int shoppingTime; public UserRecord(String n, String s, int t) { name = n; sexy = s; shoppingTime = t; } public String toString() { return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime; } } //Build class that inherits AssignerWithPunctuatedWatermarks to configure eventTime and waterMark. private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { //add tag in the data of datastream elements @Override public long extractTimestamp(UserRecord element, long previousTimestamp) { return System.currentTimeMillis(); } //give the watermark to trigger the window to execute, and use the value to check if the window elements are ready @Override public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1); } } }
The following is the command output:
name: FangBo sexy: female shoppingTime: 320 name: CaiXuyu sexy: female shoppingTime: 300
Figure 1 shows the process of execution.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot