文档首页/
    
      
      MapReduce服务 MRS/
      
      
        
        
        开发指南(普通版_3.x)/
        
        
        Flink开发指南(安全模式)/
        
        
        开发Flink应用/
        
        
        FlinkDataStream样例程序/
        
      
      Flink DataStream样例程序(Scala)
    
  
  
    
        更新时间:2024-08-05 GMT+08:00
        
          
          
        
      
      
      
      
      
      
      
      
  
      
      
      
        
Flink DataStream样例程序(Scala)
功能介绍
实时统计连续网购时间超过2个小时的女性网民信息,将统计结果直接打印出来。
代码样例
下面代码片段仅为演示,完整代码参见FlinkStreamScalaExample样例工程下的com.huawei.bigdata.flink.examples.FlinkStreamScalaExample:
    // 参数解析:
    // filePath为文本读取路径,用逗号分隔。
    // windowTime;为统计数据的窗口跨度,时间单位都是分。
    object FlinkStreamScalaExample {
  def main(args: Array[String]) {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ")
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2")
    System.out.println("******************************************************************************************")
    System.out.println("<filePath> is for text file to read data, use comma to separate")
    System.out.println("<windowTime> is the width of the window, time as minutes")
    System.out.println("******************************************************************************************")
    // 读取文本路径信息,并使用逗号分隔
    val filePaths = ParameterTool.fromArgs(args).get("filePath",
      "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim)
    assert(filePaths.length > 0)
    // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了
    val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2)
    // 构造执行环境,使用eventTime处理窗口数据
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    // 读取文本数据流
    val unionStream = if (filePaths.length > 1) {
      val firstStream = env.readTextFile(filePaths.apply(0))
      firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*)
    } else {
      env.readTextFile(filePaths.apply(0))
    }
    // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来
    unionStream.map(getRecord(_))
      .assignTimestampsAndWatermarks(new Record2TimestampExtractor)
      .filter(_.sexy == "female")
      .keyBy("name", "sexy")
      .window(TumblingEventTimeWindows.of(Time.minutes(windowTime)))
      .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime))
      .filter(_.shoppingTime > 120).print()
    // 调用execute触发执行
    env.execute("FemaleInfoCollectionPrint scala")
  }
  // 解析文本行数据,构造UserRecord数据结构
  def getRecord(line: String): UserRecord = {
    val elems = line.split(",")
    assert(elems.length == 3)
    val name = elems(0)
    val sexy = elems(1)
    val time = elems(2).toInt
    UserRecord(name, sexy, time)
  }
  // UserRecord数据结构的定义
  case class UserRecord(name: String, sexy: String, shoppingTime: Int)
  // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark
  private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] {
    // add tag in the data of datastream elements
    override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = {
      System.currentTimeMillis()
    }
    // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready
    def checkAndGetNextWatermark(lastElement: UserRecord,
                                  extractedTimestamp: Long): Watermark = {
      new Watermark(extractedTimestamp - 1)
    }
  }
}
  执行之后打印结果如下所示:
UserRecord(FangBo,female,320) UserRecord(CaiXuyu,female,300)
执行如图1所示。
   父主题: Flink DataStream样例程序
  
 
    
      