Deze pagina is nog niet beschikbaar in uw eigen taal. We werken er hard aan om meer taalversies toe te voegen. Bedankt voor uw steun.

Scala Sample Code

Updated on 2022-09-14 GMT+08:00

Function Description

Collect the information about female netizens who continuously spend more than 2 hour in online shopping and print the result.

DataStream FlinkStreamScalaExampleSample Code

The following code is an example. For details, see com.huawei.bigdata.flink.examples.FlinkStreamScalaExample.

    //Parameter description 
    //filePath indicates paths where text is read. Paths are separated by commas (,). 
    //windowTime indicates the period covered by statistics window. The unit is minute. 
    object FlinkStreamScalaExample { 
  def main(args: Array[String]) { 
    //Print the reference command used to execute flink run.
    System.out.println("use command as: ") 
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") 
    System.out.println("******************************************************************************************") 
    System.out.println("<filePath> is for text file to read data, use comma to separate") 
    System.out.println("<windowTime> is the width of the window, time as minutes") 
    System.out.println("******************************************************************************************") 
 
    //Paths where text is read. Paths are separated by commas (,)
    val filePaths = ParameterTool.fromArgs(args).get("filePath", 
      "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) 
    assert(filePaths.length > 0) 
 
    //windowTime specifies the size of the time window. By default, a window with 2 minutes as the size can read all data in a text file. 
    val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) 
 
    //Build the execution environment and run eventTime to process window data.
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
    env.setParallelism(1) 
    //Read DataStream of the text.
    val unionStream = if (filePaths.length > 1) { 
      val firstStream = env.readTextFile(filePaths.apply(0)) 
      firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) 
    } else { 
      env.readTextFile(filePaths.apply(0)) 
    } 
 
    //Convert data, build the logic for the entire data process, calculate, and print results.
    unionStream.map(getRecord(_)) 
      .assignTimestampsAndWatermarks(new Record2TimestampExtractor) 
      .filter(_.sexy == "female") 
      .keyBy("name", "sexy") 
      .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) 
      .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) 
      .filter(_.shoppingTime > 120).print() 
 
    //Call execute to trigger the execution 
    env.execute("FemaleInfoCollectionPrint scala") 
  } 
 
  //Parse the row data of the text and build UserRecord data structure.  
  def getRecord(line: String): UserRecord = { 
    val elems = line.split(",") 
    assert(elems.length == 3) 
    val name = elems(0) 
    val sexy = elems(1) 
    val time = elems(2).toInt 
    UserRecord(name, sexy, time) 
  } 
 
  //Defines UserRecord data structure.  
  case class UserRecord(name: String, sexy: String, shoppingTime: Int) 
 
 
  //Build class that inherits AssignerWithPunctuatedWatermarks to configure eventTime and waterMark. 
  private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { 
 
    //add tag in the data of datastream elements 
    override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { 
      System.currentTimeMillis() 
    } 
 
    //give the watermark to trigger the window to execute, and use the value to check if the window elements are ready 
    def checkAndGetNextWatermark(lastElement: UserRecord, 
                                  extractedTimestamp: Long): Watermark = { 
      new Watermark(extractedTimestamp - 1) 
    } 
  } 
} 

The following is the command output:

  UserRecord(FangBo,female,320) 
  UserRecord(CaiXuyu,female,300)

Figure 1 shows the process of execution.

Figure 1 Process of execution
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback