Esta página aún no está disponible en su idioma local. Estamos trabajando arduamente para agregar más versiones de idiomas. Gracias por tu apoyo.

On this page

Scala Sample Code

Updated on 2022-09-14 GMT+08:00

Function Description

Collect statistics on female netizens who continuously dwell on online shopping for more than 2 hours in real time and print statistics directly.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.flink.example.stream.FlinkStreamScalaExample.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    // Parameter description:
    // filePath is the path for reading text files. The paths are separated by commas (,).
    // windowTime is the time span of the statistics data. The unit is minute.
object FlinkStreamScalaExample {
  def main(args: Array[String]) {
    // Print the command reference for flink run. 
    System.out.println("use command as: ")
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2")
    System.out.println("******************************************************************************************")
    System.out.println("<filePath> is for text file to read data, use comma to separate")
    System.out.println("<windowTime> is the width of the window, time as minutes")
    System.out.println("******************************************************************************************")

    // Read text path information. The paths are separated by commas (,).
    val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim)
    assert(filePaths.length > 0)

    // windowTime is used to set the time window. The default value is 2 minutes per time window. One time window is sufficient to read all data in the text.
    val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2)

    // Construct an execution environment and use eventTime to process the data obtained in a time window.
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // Read the text data stream.
    val unionStream = if (filePaths.length > 1) {
      val firstStream = env.readTextFile(filePaths.apply(0))
      firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*)
    } else {
      env.readTextFile(filePaths.apply(0))
    }

    // Convert the data, construct the entire data processing logic, and calculate and print the results.
    unionStream.map(getRecord(_))
      .assignTimestampsAndWatermarks(new Record2TimestampExtractor)
      .filter(_.sexy == "female")
      .keyBy("name", "sexy")
      .window(TumblingEventTimeWindows.of(Time.minutes(windowTime)))
      .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime))
      .filter(_.shoppingTime > 120).print()

    // Invoke execute to trigger the execution. 
    env.execute("FemaleInfoCollectionPrint scala")
  }

  // Resolve the text line data and construct the UserRecord data structure.
  def getRecord(line: String): UserRecord = {
    val elems = line.split(",")
    assert(elems.length == 3)
    val name = elems(0)
    val sexy = elems(1)
    val time = elems(2).toInt
    UserRecord(name, sexy, time)
  }

  // Define the UserRecord data structure.
  case class UserRecord(name: String, sexy: String, shoppingTime: Int)


  // Construct a class inherited from AssignerWithPunctuatedWatermarks to set eventTime and waterMark.
  private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] {

    // add tag in the data of datastream elements
    override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = {
      System.currentTimeMillis()
    }

    // give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready
    def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = {
      new Watermark(extractedTimestamp - 1)
    }
  }

}

The following is the command output:

1
2
  UserRecord(FangBo,female,320)
  UserRecord(CaiXuyu,female,300)
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback