Java Sample Code
Function Description
Collect the information about female netizens who continuously spend more than 2 hour in online shopping and print the result.
DataStream FlinkStreamJavaExampleSample Code
The following code segment is an example. For details, see com.huawei.bigdata.flink.examples.FlinkStreamJavaExample.
//Parameter description: //<filePath> indicates paths where text is read. Paths are separated by commas (,). //<windowTime> indicates the period covered by statistics window. The unit is minute. public class FlinkStreamJavaExample { public static void main(String[] args) throws Exception { //Print reference command for executing flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); System.out.println("******************************************************************************************"); System.out.println("<filePath> is for text file to read data, use comma to separate"); System.out.println("<windowTime> is the width of the window, time as minutes"); System.out.println("******************************************************************************************"); //Paths where text is read. Paths are separated by commas (,) final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); assert filePaths.length > 0; //windowTime specifies the size of the time window. By default, a window with 2 minutes as the size can read all data in a text file. final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); //Build the execution environment and run eventTime to process window data. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); //Read DataStream of the text. DataStream<String> unionStream = env.readTextFile(filePaths[0]); if (filePaths.length > 1) { for (int i = 1; i < filePaths.length; i++) { unionStream = unionStream.union(env.readTextFile(filePaths[i])); } } //Convert data, build the logic for the entire data process, calculate, and print results. unionStream.map(new MapFunction<String, UserRecord>() { @Override public UserRecord map(String value) throws Exception { return getRecord(value); } }).assignTimestampsAndWatermarks( new Record2TimestampExtractor() ).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.sexy.equals("female"); } }).keyBy( new UserRecordSelector() ).window( TumblingEventTimeWindows.of(Time.minutes(windowTime)) ).reduce(new ReduceFunction<UserRecord>() { @Override public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception { value1.shoppingTime += value2.shoppingTime; return value1; } }).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.shoppingTime > 120; } }).print(); //Call execute to trigger the execution. env.execute("FemaleInfoCollectionPrint java"); } //Build the keyword of keyBy as the grouping basis. private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { @Override public Tuple2<String, String> getKey(UserRecord value) throws Exception { return Tuple2.of(value.name, value.sexy); } } //Parse the row data of the text and build UserRecord data structure. private static UserRecord getRecord(String line) { String[] elems = line.split(","); assert elems.length == 3; return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); } //Defines UserRecord data structure and rewrite toString printing method. public static class UserRecord { private String name; private String sexy; private int shoppingTime; public UserRecord(String n, String s, int t) { name = n; sexy = s; shoppingTime = t; } public String toString() { return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime; } } //Build class that inherits AssignerWithPunctuatedWatermarks to configure eventTime and waterMark. private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { //add tag in the data of datastream elements @Override public long extractTimestamp(UserRecord element, long previousTimestamp) { return System.currentTimeMillis(); } //give the watermark to trigger the window to execute, and use the value to check if the window elements are ready @Override public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1); } } }
The following is the command output:
name: FangBo sexy: female shoppingTime: 320 name: CaiXuyu sexy: female shoppingTime: 300
Figure 1 shows the process of execution.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.