Flink DataStream Scala Sample Code
Function Description
Collect statistics on female netizens who continuously dwell on online shopping for more than 2 hours in real time and print statistics directly.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.flink.example.stream.FlinkStreamScalaExample.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
// Parameter description: // filePath is the path for reading text files. The paths are separated by commas (,). // windowTime is the time span of the statistics data. The unit is minute. object FlinkStreamScalaExample { def main(args: Array[String]) { // Print the command reference for flink run. System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") System.out.println("******************************************************************************************") System.out.println("<filePath> is for text file to read data, use comma to separate") System.out.println("<windowTime> is the width of the window, time as minutes") System.out.println("******************************************************************************************") // Read text path information. The paths are separated by commas (,). val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) assert(filePaths.length > 0) // windowTime is used to set the time window. The default value is 2 minutes per time window. One time window is sufficient to read all data in the text. val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) // Construct an execution environment and use eventTime to process the data obtained in a time window. val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // Read the text data stream. val unionStream = if (filePaths.length > 1) { val firstStream = env.readTextFile(filePaths.apply(0)) firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) } else { env.readTextFile(filePaths.apply(0)) } // Convert the data, construct the entire data processing logic, and calculate and print the results. unionStream.map(getRecord(_)) .assignTimestampsAndWatermarks(new Record2TimestampExtractor) .filter(_.sexy == "female") .keyBy("name", "sexy") .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) .filter(_.shoppingTime > 120).print() // Invoke execute to trigger the execution. env.execute("FemaleInfoCollectionPrint scala") } // Resolve the text line data and construct the UserRecord data structure. def getRecord(line: String): UserRecord = { val elems = line.split(",") assert(elems.length == 3) val name = elems(0) val sexy = elems(1) val time = elems(2).toInt UserRecord(name, sexy, time) } // Define the UserRecord data structure. case class UserRecord(name: String, sexy: String, shoppingTime: Int) // Construct a class inherited from AssignerWithPunctuatedWatermarks to set eventTime and waterMark. private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { // add tag in the data of datastream elements override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { System.currentTimeMillis() } // give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - 1) } } } |
The following is the command output:
1 2 |
UserRecord(FangBo,female,320) UserRecord(CaiXuyu,female,300) |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot