文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Flink开发指南(普通模式)/
开发Flink应用/
Flink DataStream样例程序/
Flink DataStream样例程序(Scala)
更新时间:2024-08-03 GMT+08:00
Flink DataStream样例程序(Scala)
功能介绍
实时统计连续网购时间超过2个小时的女性网民信息,将统计结果直接打印出来。
DataStream FlinkStreamScalaExample代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.flink.examples.FlinkStreamScalaExample:
// 参数解析: // filePath为文本读取路径,用逗号分隔。 // windowTime;为统计数据的窗口跨度,时间单位都是分。 object FlinkStreamScalaExample { def main(args: Array[String]) { // 打印出执行flink run的参考命令 System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") System.out.println("******************************************************************************************") System.out.println("<filePath> is for text file to read data, use comma to separate") System.out.println("<windowTime> is the width of the window, time as minutes") System.out.println("******************************************************************************************") // 读取文本路径信息,并使用逗号分隔 val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) assert(filePaths.length > 0) // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了 val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) // 构造执行环境,使用eventTime处理窗口数据 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 读取文本数据流 val unionStream = if (filePaths.length > 1) { val firstStream = env.readTextFile(filePaths.apply(0)) firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) } else { env.readTextFile(filePaths.apply(0)) } // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来 unionStream.map(getRecord(_)) .assignTimestampsAndWatermarks(new Record2TimestampExtractor) .filter(_.sexy == "female") .keyBy("name", "sexy") .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) .filter(_.shoppingTime > 120).print() // 调用execute触发执行 env.execute("FemaleInfoCollectionPrint scala") } // 解析文本行数据,构造UserRecord数据结构 def getRecord(line: String): UserRecord = { val elems = line.split(",") assert(elems.length == 3) val name = elems(0) val sexy = elems(1) val time = elems(2).toInt UserRecord(name, sexy, time) } // UserRecord数据结构的定义 case class UserRecord(name: String, sexy: String, shoppingTime: Int) // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { // add tag in the data of datastream elements override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { System.currentTimeMillis() } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - 1) } } }
执行之后打印结果如下所示:
UserRecord(FangBo,female,320) UserRecord(CaiXuyu,female,300)
执行如图1所示。
父主题: Flink DataStream样例程序