Updated on 2024-08-10 GMT+08:00

Flink DataStream Sample Program (Scala)

Function Description

Collect the information about female netizens who continuously spend more than 2 hour in online shopping and print the result.

DataStream FlinkStreamScalaExampleSample Code

The following code is an example. For details, see com.huawei.bigdata.flink.examples.FlinkStreamScalaExample.

    //Parameter description 
    //filePath indicates paths where text is read. Paths are separated by commas (,). 
    //windowTime indicates the period covered by statistics window. The unit is minute. 
    object FlinkStreamScalaExample { 
  def main(args: Array[String]) { 
    //Print the reference command used to execute flink run.
    System.out.println("use command as: ") 
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") 
    System.out.println("******************************************************************************************") 
    System.out.println("<filePath> is for text file to read data, use comma to separate") 
    System.out.println("<windowTime> is the width of the window, time as minutes") 
    System.out.println("******************************************************************************************") 
 
    //Paths where text is read. Paths are separated by commas (,)
    val filePaths = ParameterTool.fromArgs(args).get("filePath", 
      "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) 
    assert(filePaths.length > 0) 
 
    //windowTime specifies the size of the time window. By default, a window with 2 minutes as the size can read all data in a text file. 
    val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) 
 
    //Build the execution environment and run eventTime to process window data.
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
    env.setParallelism(1) 
    //Read DataStream of the text.
    val unionStream = if (filePaths.length > 1) { 
      val firstStream = env.readTextFile(filePaths.apply(0)) 
      firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) 
    } else { 
      env.readTextFile(filePaths.apply(0)) 
    } 
 
    //Convert data, build the logic for the entire data process, calculate, and print results.
    unionStream.map(getRecord(_)) 
      .assignTimestampsAndWatermarks(new Record2TimestampExtractor) 
      .filter(_.sexy == "female") 
      .keyBy("name", "sexy") 
      .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) 
      .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) 
      .filter(_.shoppingTime > 120).print() 
 
    //Call execute to trigger the execution 
    env.execute("FemaleInfoCollectionPrint scala") 
  } 
 
  //Parse the row data of the text and build UserRecord data structure.  
  def getRecord(line: String): UserRecord = { 
    val elems = line.split(",") 
    assert(elems.length == 3) 
    val name = elems(0) 
    val sexy = elems(1) 
    val time = elems(2).toInt 
    UserRecord(name, sexy, time) 
  } 
 
  //Defines UserRecord data structure.  
  case class UserRecord(name: String, sexy: String, shoppingTime: Int) 
 
 
  //Build class that inherits AssignerWithPunctuatedWatermarks to configure eventTime and waterMark. 
  private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { 
 
    //add tag in the data of datastream elements 
    override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { 
      System.currentTimeMillis() 
    } 
 
    //give the watermark to trigger the window to execute, and use the value to check if the window elements are ready 
    def checkAndGetNextWatermark(lastElement: UserRecord, 
                                  extractedTimestamp: Long): Watermark = { 
      new Watermark(extractedTimestamp - 1) 
    } 
  } 
} 

The following is the command output:

  UserRecord(FangBo,female,320) 
  UserRecord(CaiXuyu,female,300)

Figure 1 shows the process of execution.

Figure 1 Process of execution