Scala Sample Code
Function Description
Collect statistics on female netizens who continuously dwell on online shopping for more than 2 hours in real time and print statistics directly.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.flink.example.stream.FlinkStreamScalaExample.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
// Parameter description:
// filePath is the path for reading text files. The paths are separated by commas (,).
// windowTime is the time span of the statistics data. The unit is minute.
object FlinkStreamScalaExample {
def main(args: Array[String]) {
// Print the command reference for flink run.
System.out.println("use command as: ")
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2")
System.out.println("******************************************************************************************")
System.out.println("<filePath> is for text file to read data, use comma to separate")
System.out.println("<windowTime> is the width of the window, time as minutes")
System.out.println("******************************************************************************************")
// Read text path information. The paths are separated by commas (,).
val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim)
assert(filePaths.length > 0)
// windowTime is used to set the time window. The default value is 2 minutes per time window. One time window is sufficient to read all data in the text.
val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2)
// Construct an execution environment and use eventTime to process the data obtained in a time window.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// Read the text data stream.
val unionStream = if (filePaths.length > 1) {
val firstStream = env.readTextFile(filePaths.apply(0))
firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*)
} else {
env.readTextFile(filePaths.apply(0))
}
// Convert the data, construct the entire data processing logic, and calculate and print the results.
unionStream.map(getRecord(_))
.assignTimestampsAndWatermarks(new Record2TimestampExtractor)
.filter(_.sexy == "female")
.keyBy("name", "sexy")
.window(TumblingEventTimeWindows.of(Time.minutes(windowTime)))
.reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime))
.filter(_.shoppingTime > 120).print()
// Invoke execute to trigger the execution.
env.execute("FemaleInfoCollectionPrint scala")
}
// Resolve the text line data and construct the UserRecord data structure.
def getRecord(line: String): UserRecord = {
val elems = line.split(",")
assert(elems.length == 3)
val name = elems(0)
val sexy = elems(1)
val time = elems(2).toInt
UserRecord(name, sexy, time)
}
// Define the UserRecord data structure.
case class UserRecord(name: String, sexy: String, shoppingTime: Int)
// Construct a class inherited from AssignerWithPunctuatedWatermarks to set eventTime and waterMark.
private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] {
// add tag in the data of datastream elements
override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = {
System.currentTimeMillis()
}
// give the watermark to trigger the window to start execution, and use the value to check if the window elements are ready
def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - 1)
}
}
}
|
The following is the command output:
1 2 |
UserRecord(FangBo,female,320)
UserRecord(CaiXuyu,female,300)
|
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.