Scala Sample Code
Function Description
Collect the information about female netizens who continuously spend more than 2 hour in online shopping and print the result.
DataStream FlinkStreamScalaExampleSample Code
The following code is an example. For details, see com.huawei.bigdata.flink.examples.FlinkStreamScalaExample.
//Parameter description //filePath indicates paths where text is read. Paths are separated by commas (,). //windowTime indicates the period covered by statistics window. The unit is minute. object FlinkStreamScalaExample { def main(args: Array[String]) { //Print the reference command used to execute flink run. System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") System.out.println("******************************************************************************************") System.out.println("<filePath> is for text file to read data, use comma to separate") System.out.println("<windowTime> is the width of the window, time as minutes") System.out.println("******************************************************************************************") //Paths where text is read. Paths are separated by commas (,) val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) assert(filePaths.length > 0) //windowTime specifies the size of the time window. By default, a window with 2 minutes as the size can read all data in a text file. val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) //Build the execution environment and run eventTime to process window data. val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) //Read DataStream of the text. val unionStream = if (filePaths.length > 1) { val firstStream = env.readTextFile(filePaths.apply(0)) firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) } else { env.readTextFile(filePaths.apply(0)) } //Convert data, build the logic for the entire data process, calculate, and print results. unionStream.map(getRecord(_)) .assignTimestampsAndWatermarks(new Record2TimestampExtractor) .filter(_.sexy == "female") .keyBy("name", "sexy") .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) .filter(_.shoppingTime > 120).print() //Call execute to trigger the execution env.execute("FemaleInfoCollectionPrint scala") } //Parse the row data of the text and build UserRecord data structure. def getRecord(line: String): UserRecord = { val elems = line.split(",") assert(elems.length == 3) val name = elems(0) val sexy = elems(1) val time = elems(2).toInt UserRecord(name, sexy, time) } //Defines UserRecord data structure. case class UserRecord(name: String, sexy: String, shoppingTime: Int) //Build class that inherits AssignerWithPunctuatedWatermarks to configure eventTime and waterMark. private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { //add tag in the data of datastream elements override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { System.currentTimeMillis() } //give the watermark to trigger the window to execute, and use the value to check if the window elements are ready def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - 1) } } }
The following is the command output:
UserRecord(FangBo,female,320) UserRecord(CaiXuyu,female,300)
Figure 1 shows the process of execution.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.