Updated on 2024-08-16 GMT+08:00

Flink DataStream Scala Sample Code

Function Description

Collect statistics on female netizens who continuously dwell on online shopping for more than 2 hours in real time and print statistics directly.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.flink.example.stream.FlinkStreamScalaExample.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    // Parameter description:
    // filePath is the path for reading text files. The paths are separated by commas (,).
    // windowTime is the time span of the statistics data. The unit is minute.
object FlinkStreamScalaExample {
  def main(args: Array[String]) {
    // Print the command reference for flink run. 
    System.out.println("use command as: ")
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2")
    System.out.println("******************************************************************************************")
    System.out.println("<filePath> is for text file to read data, use comma to separate")
    System.out.println("<windowTime> is the width of the window, time as minutes")
    System.out.println("******************************************************************************************")

    // Read text path information. The paths are separated by commas (,).
    val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim)
    assert(filePaths.length > 0)

    // windowTime is used to set the time window. The default value is 2 minutes per time window. One time window is sufficient to read all data in the text.
    val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2)

    // Construct an execution environment and use eventTime to process the data obtained in a time window.
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // Read the text data stream.
    val unionStream = if (filePaths.length > 1) {
      val firstStream = env.readTextFile(filePaths.apply(0))
      firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*)
    } else {
      env.readTextFile(filePaths.apply(0))
    }

    // Convert the data, construct the entire data processing logic, and calculate and print the results.
    unionStream.map(getRecord(_))
      .assignTimestampsAndWatermarks(new Record2TimestampExtractor)
      .filter(_.sexy == "female")
      .keyBy("name", "sexy")
      .window(TumblingEventTimeWindows.of(Time.minutes(windowTime)))
      .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime))
      .filter(_.shoppingTime > 120).print()

    // Invoke execute to trigger the execution. 
    env.execute("FemaleInfoCollectionPrint scala")
  }

  // Resolve the text line data and construct the UserRecord data structure.
  def getRecord(line: String): UserRecord = {
    val elems = line.split(",")
    assert(elems.length == 3)
    val name = elems(0)
    val sexy = elems(1)
    val time = elems(2).toInt
    UserRecord(name, sexy, time)
  }

  // Define the UserRecord data structure.
  case class UserRecord(name: String, sexy: String, shoppingTime: Int)


  // Construct a class inherited from AssignerWithPunctuatedWatermarks to set eventTime and waterMark.
  private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] {

    // add tag in the data of datastream elements
    override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = {
      System.currentTimeMillis()
    }

    // give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready
    def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = {
      new Watermark(extractedTimestamp - 1)
    }
  }

}

The following is the command output:

1
2
  UserRecord(FangBo,female,320)
  UserRecord(CaiXuyu,female,300)