Scala
To avoid API compatibility or reliability issues after updates to the open-source Flink, recommended that APIs of the matching version are recommended.
Common APIs of Flink
Flink mainly uses the following APIs:
- StreamExecutionEnvironment: provides the execution environment, which is the basis of Flink stream processing.
- DataStream: represents streaming data. DataStream can be regarded as unmodifiable collection that contains duplicate data. The number of elements in DataStream are unlimited.
- KeyedStream: generates the stream by performing keyBy grouping operations on DataStream. Data is grouped based on the specified key value.
- WindowedStream: generates the stream by performing the window function on KeyedStream, sets the window type, and defines window triggering conditions.
- AllWindowedStream: generates streams by performing the window function on DataStream, sets the window type, defines window triggering conditions, and performs operations on the window data.
- ConnectedStreams: generates streams by connecting the two DataStreams and retaining the original data type, and performs the map or flatMap operation.
- JoinedStreams: performs equijoin operation to data in the window. The join operation is a special scenario of coGroup operation.
- CoGroupedStreams: performs the coGroup operation on data in the window. CoGroupedStreams can perform various types of join operations.
Data Stream Source
API |
Description |
---|---|
def fromElements[T: TypeInformation](data: T*): DataStream[T] |
Obtain user-defined data of multiple elements as the data stream source. data is the specific data of multiple elements. |
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] |
Obtain the user-defined data collection as input data stream. data can be a data collection or a data body that can be iterated. |
def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T] |
|
def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T]): DataStream[T] |
Obtain the user-defined data collection as parallel data stream source. data indicates the iterator that can be divided into multiple partitions. |
def generateSequence(from: Long, to: Long): DataStream[Long] |
Obtain a sequence of user-defined data as the data stream source.
|
def readTextFile(filePath: String): DataStream[String] |
Obtain the user-defined text file from a specific path as the data stream source.
|
def readTextFile(filePath: String, charsetName: String): DataStream[String] |
|
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String) |
Obtain the user-defined file from a specific path as the data stream source.
|
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String, watchType: FileProcessingMode, interval: Long): DataStream[T] |
|
def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): DataStream[String] |
Obtain user-defined socket data as the data stream source.
|
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] |
Customize the SourceFunction and addSource methods to add data sources such as Kafka. The implementation method is the run method of SourceFunction.
|
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] |
Data Output
API |
Description |
---|---|
def print(): DataStreamSink[T] |
Print data as the standard output stream. |
def printToErr() |
Print data as the standard error output stream. |
def writeAsText(path: String): DataStreamSink[T] |
Write data to a specific text file.
|
def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T] |
|
def writeAsCsv(path: String): DataStreamSink[T] |
Writhe data to a specific .csv file.
|
def writeAsCsv(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T] |
|
def writeAsCsv(path: String, writeMode: FileSystem.WriteMode, rowDelimiter: String, fieldDelimiter: String): DataStreamSink[T] |
|
def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T] |
Write data to a file, for example, a binary file. |
def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T]): DataStreamSink[T] |
Write data to the socket connection.
|
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] |
Export data in a user-defined manner. The flink-connectors allows addSink method to support exporting data to Kafka. The implementation method is the invoke method of SinkFunction. |
def addSink(fun: T => Unit): DataStreamSink[T] |
Filtering and Mapping
API |
Description |
---|---|
def map[R: TypeInformation](fun: T => R): DataStream[R] |
Transform an element into another element of the same type. |
def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R] |
|
def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R] |
Transform an element into zero, one, or multiple elements. |
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R] |
|
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] |
|
def filter(filter: FilterFunction[T]): DataStream[T] |
Run a Boolean function on each element and retain elements that return true. |
def filter(fun: T => Boolean): DataStream[T] |
Aggregation
API |
Description |
---|---|
def keyBy(fields: Int*): KeyedStream[T, JavaTuple] |
Logically divide a stream into multiple partitions, each containing elements with the same key. The partitioning is internally implemented using hash partition. A KeyedStream is returned. Return the KeyedStream after the KeyBy operation, and call the KeyedStream function, such as reduce, fold, min, minby, max, maxby, sum, and sumby.
|
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] |
|
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] |
|
def reduce(fun: (T, T) => T): DataStream[T] |
Perform reduce on KeyedStream in a rolling manner. Aggregate the current element and the last reduced value into a new value. The types of the three values are the same. |
def reduce(reducer: ReduceFunction[T]): DataStream[T] |
|
def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R] |
Perform fold operation on KeyedStream based on an initial value in a rolling manner. Aggregate the current element and the last folded value into a new value. The input and returned values of Fold can be different. |
def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]): DataStream[R] |
|
def sum(position: Int): DataStream[T] |
Calculate the sum in a KeyedStream in a rolling manner. position and field indicate calculating the sum of a specific column. |
def sum(field: String): DataStream[T] |
|
def min(position: Int): DataStream[T] |
Calculate the minimum value in a KeyedStream in a rolling manner. min returns the minimum value, without guarantee of the correctness of columns of non-minimum values. position and field indicate calculating the minimum value of a specific column. |
def min(field: String): DataStream[T] |
|
def max(position: Int): DataStream[T] |
Calculate the maximum value in KeyedStream in a rolling manner. max returns the maximum value, without guarantee of the correctness of columns of non-maximum values. position and field indicate calculating the maximum value of a specific column. |
def max(field: String): DataStream[T] |
|
def minBy(position: Int): DataStream[T] |
Obtain the row where the minimum value of a column locates in a KeyedStream. minBy returns all elements of that row. position and field indicate the column on which the minBy operation is performed. |
def minBy(field: String): DataStream[T] |
|
def maxBy(position: Int): DataStream[T] |
Obtain the row where the maximum value of a column locates in a KeyedStream. maxBy returns all elements of that row. position and field indicate the column on which the maxBy operation is performed. |
def maxBy(field: String): DataStream[T] |
DataStream Distribution
API |
Description |
---|---|
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] |
Use a user-defined partitioner to select target task for each element.
|
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String):DataStream[T] |
|
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T] |
|
def shuffle: DataStream[T] |
Randomly and evenly partition elements. |
def rebalance: DataStream[T] |
Partition elements in round-robin manner, ensuring load balance of each partition. This partitioning is helpful to data with skewness. |
def rescale: DataStream[T] |
Distribute elements into downstream subset in round-robin manner.
NOTE:
The method for checking the code is similar to the rebalance method. |
def broadcast: DataStream[T] |
Broadcast each element to all partitions. |
Configuring the eventtime Attribute
API |
Description |
---|---|
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] |
Extract timestamp from records, enabling event time window to trigger computing. |
def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T] |
Iteration
API |
Description |
---|---|
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),maxWaitTimeMillis:Long = 0,keepPartitioning: Boolean = false) : DataStream[R] |
In the flow, create in a feedback loop to redirect the output of an operator to a preceding operator.
NOTE:
|
def iterate[R, F: TypeInformation](stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),maxWaitTimeMillis:Long): DataStream[R] |
Stream Splitting
API |
Description |
---|---|
def split(selector: OutputSelector[T]): SplitStream[T] |
Use OutputSelector to rewrite select method, specify stream splitting basis (by tagging), and construct SplitStream streams. That is, a string is tagged for each element, so that a stream of a specific tag can be selected or created. |
def select(outputNames: String*): DataStream[T] |
Select one or multiple streams from a SplitStream. outputNames indicates the sequence of string tags created for each element using the split method. |
Window
Windows can be classified as tumbling windows and sliding windows.
- APIs such as Window, TimeWindow, CountWindow, WindowAll, TimeWindowAll, and CountWindowAll can be used to generate windows.
- APIs such as Window Apply, Window Reduce, Window Fold, and Aggregations on windows can be used to operate windows.
- Multiple Window Assigners are supported, including TumblingEventTimeWindows, TumblingProcessingTimeWindows, SlidingEventTimeWindows, SlidingProcessingTimeWindows, EventTimeSessionWindows, ProcessingTimeSessionWindows, and GlobalWindows.
- ProcessingTime, EventTime, and IngestionTime are supported.
- Timestamp modes EventTime: AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks are supported.
Table9 APIs for generating windows lists APIs for generating windows.
API |
Description |
---|---|
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] |
Define windows in partitioned KeyedStreams. A window groups each key according to some characteristics (for example, data received within the latest 5 seconds. |
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] |
Define windows in DataStreams. |
def timeWindow(size: time WindowedStream[T, K, TimeWindow] |
Define time windows in partitioned KeyedStreams, select ProcessingTime or EventTime based on the environment.getStreamTimeCharacteristic() parameter, and determine whether the window is TumblingWindow or SlidingWindow depends on the number of parameters.
NOTE:
|
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] |
|
def timeWindowAll(size: time AllWindowedStream[T, TimeWindow] |
Define time windows in partitioned DataStream, select ProcessingTime or EventTime based on the environment.getStreamTimeCharacteristic() parameter, and determine whether the window is TumblingWindow or SlidingWindow depends on the number of parameters.
|
def timeWindowAll(size: Time, slide: time AllWindowedStream[T, TimeWindow] |
|
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] |
Divide windows according to the number of elements and define windows in partitioned KeyedStreams.
NOTE:
|
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow] |
|
def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] |
Divide windows according to the number of elements and define windows in DataStreams.
|
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] |
Table 10 lists APIs for operating windows.
Method |
API |
Description |
---|---|---|
Window |
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R] |
Apply a general function to a window. The data in the window is calculated as a whole. function indicates the window function to be executed. |
def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] |
||
def reduce(function: ReduceFunction[T]): DataStream[T] |
Apply a reduce function to the window and return the result.
|
|
def reduce(function: (T, T) => T): DataStream[T] |
||
def reduce[R: TypeInformation](preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R] |
||
def reduce[R: TypeInformation](preAggregator: (T, T) => T, windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] |
||
def fold[R: TypeInformation](initialValue: R, function: FoldFunction[T,R]): DataStream[R] |
Apply a fold function to the window and return the result.
|
|
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] |
||
def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, foldFunction: FoldFunction[T, ACC], function: WindowFunction[ACC, R, K, W]): DataStream[R] |
||
def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, foldFunction: (ACC, T) => ACC, windowFunction: (K, W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] |
||
WindowAll |
def apply[R: TypeInformation](function: AllWindowFunction[T, R, W]): DataStream[R] |
Apply a general function to a window. The data in the window is calculated as a whole. |
def apply[R: TypeInformation](function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] |
||
def reduce(function: ReduceFunction[T]): DataStream[T] |
Apply a reduce function to the window and return the result.
|
|
def reduce(function: (T, T) => T): DataStream[T] |
||
def reduce[R: TypeInformation](preAggregator: ReduceFunction[T], windowFunction: AllWindowFunction[T, R, W]): DataStream[R] |
||
def reduce[R: TypeInformation](preAggregator: (T, T) => T, windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] |
||
def fold[R: TypeInformation](initialValue: R, function: FoldFunction[T,R]): DataStream[R] |
Apply a fold function to the window and return the result.
|
|
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] |
||
def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, preAggregator: FoldFunction[T, ACC], windowFunction: AllWindowFunction[ACC, R, W]): DataStream[R] |
||
def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, preAggregator: (ACC, T) => ACC, windowFunction: (W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] |
||
Window and WindowAll |
def sum(position: Int): DataStream[T] |
Sum a specified column of the window data. field and position indicate a specific column of the data. |
def sum(field: String): DataStream[T] |
||
def min(position: Int): DataStream[T] |
Calculate the minimum value of a specified column of the window data. min returns the minimum value, without guarantee of the correctness of columns of non-minimum values. position and field indicate calculating the minimum value of a specific column. |
|
def min(field: String): DataStream[T] |
||
def max(position: Int): DataStream[T] |
Calculate the maximum value of a specified column of the window data. max returns the maximum value, without guarantee of the correctness of columns of non-maximum values. position and field indicate calculating the maximum value of a specific column. |
|
def max(field: String): DataStream[T] |
||
def minBy(position: Int): DataStream[T] |
Obtain the row where the minimum value of a column locates in the window data. minBy returns all elements of that row. position and field indicate the column on which the minBy operation is performed. |
|
def minBy(field: String): DataStream[T] |
||
def maxBy(position: Int): DataStream[T] |
Obtain the row where the maximum value of a column locates in the window data. maxBy returns all elements of that row. position and field indicate the column on which the maxBy operation is performed. |
|
def maxBy(field: String): DataStream[T] |
Combining Multiple DataStreams
API |
Description |
---|---|
def union(dataStreams: DataStream[T]*): DataStream[T] |
Perform union operation on multiple DataStreams to generate a new data stream containing all data from original DataStreams.
NOTE:
If you perform union operation on a piece of data with itself, there are two copies of the same data. |
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] |
Connect two DataStreams and retain the original type. The connect API method allows two DataStreams to share statuses. After ConnectedStreams is generated, map or flatMap operation can be called. |
def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R] |
Perform mapping operation, which is similar to map and flatMap operation in a ConnectedStreams, on elements. After the operation, the type of the new DataStreams is string. |
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R] |
|
def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R] |
Perform union operation on multiple DataStreams to generate a new data stream containing all data from original DataStreams.
NOTE:
If you perform union operation on a piece of data with itself, there are two copies of the same data. |
def flatMap[R: TypeInformation](fun1: (IN1, Collector[R]) => Unit, fun2: (IN2, Collector[R]) => Unit): DataStream[R] |
|
def flatMap[R: TypeInformation](fun1·IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R] |
Join Operation
API |
Description |
---|---|
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2] |
Join two DataStreams using a given key in a specified window. The key value of the join operation is specified by the where and eaualTo method, indicating filtering data with equivalent conditions from two DataStreams. |
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2] |
Co-group two DataStreams using a given key in a specified window. The key value of the coGroup operation is specified by the where and eaualTo method, indicating partitioning two DataStreams using equivalent conditions. |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot