Flink DataStream Sample Program (Scala)
Function Description
Collect the information about female netizens who continuously spend more than 2 hour in online shopping and print the result.
DataStream FlinkStreamScalaExampleSample Code
The following code is an example. For details, see com.huawei.bigdata.flink.examples.FlinkStreamScalaExample.
//Parameter description //filePath indicates paths where text is read. Paths are separated by commas (,). //windowTime indicates the period covered by statistics window. The unit is minute. object FlinkStreamScalaExample { def main(args: Array[String]) { //Print the reference command used to execute flink run. System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") System.out.println("******************************************************************************************") System.out.println("<filePath> is for text file to read data, use comma to separate") System.out.println("<windowTime> is the width of the window, time as minutes") System.out.println("******************************************************************************************") //Paths where text is read. Paths are separated by commas (,) val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) assert(filePaths.length > 0) //windowTime specifies the size of the time window. By default, a window with 2 minutes as the size can read all data in a text file. val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) //Build the execution environment and run eventTime to process window data. val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) //Read DataStream of the text. val unionStream = if (filePaths.length > 1) { val firstStream = env.readTextFile(filePaths.apply(0)) firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) } else { env.readTextFile(filePaths.apply(0)) } //Convert data, build the logic for the entire data process, calculate, and print results. unionStream.map(getRecord(_)) .assignTimestampsAndWatermarks(new Record2TimestampExtractor) .filter(_.sexy == "female") .keyBy("name", "sexy") .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) .filter(_.shoppingTime > 120).print() //Call execute to trigger the execution env.execute("FemaleInfoCollectionPrint scala") } //Parse the row data of the text and build UserRecord data structure. def getRecord(line: String): UserRecord = { val elems = line.split(",") assert(elems.length == 3) val name = elems(0) val sexy = elems(1) val time = elems(2).toInt UserRecord(name, sexy, time) } //Defines UserRecord data structure. case class UserRecord(name: String, sexy: String, shoppingTime: Int) //Build class that inherits AssignerWithPunctuatedWatermarks to configure eventTime and waterMark. private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { //add tag in the data of datastream elements override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { System.currentTimeMillis() } //give the watermark to trigger the window to execute, and use the value to check if the window elements are ready def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - 1) } } }
The following is the command output:
UserRecord(FangBo,female,320) UserRecord(CaiXuyu,female,300)
Figure 1 shows the process of execution.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot