Updated on 2023-04-10 GMT+08:00

Scala

To avoid API compatibility or reliability issues after updates to the open-source Flink, recommended that APIs of the matching version are recommended.

Common APIs of Flink

Flink mainly uses the following APIs:

  • StreamExecutionEnvironment: provides the execution environment, which is the basis of Flink stream processing.
  • DataStream: represents streaming data. DataStream can be regarded as unmodifiable collection that contains duplicate data. The number of elements in DataStream are unlimited.
  • KeyedStream: generates the stream by performing keyBy grouping operations on DataStream. Data is grouped based on the specified key value.
  • WindowedStream: generates the stream by performing the window function on KeyedStream, sets the window type, and defines window triggering conditions.
  • AllWindowedStream: generates streams by performing the window function on DataStream, sets the window type, defines window triggering conditions, and performs operations on the window data.
  • ConnectedStreams: generates streams by connecting the two DataStreams and retaining the original data type, and performs the map or flatMap operation.
  • JoinedStreams: performs equijoin operation to data in the window. The join operation is a special scenario of coGroup operation.
  • CoGroupedStreams: performs the coGroup operation on data in the window. CoGroupedStreams can perform various types of join operations.
Figure 1 Conversion of Flink stream types

Data Stream Source

Table 1 APIs about data stream source

API

Description

def fromElements[T: TypeInformation](data: T*): DataStream[T]

Obtain user-defined data of multiple elements as the data stream source.

data is the specific data of multiple elements.

def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]

Obtain the user-defined data collection as input data stream.

data can be a data collection or a data body that can be iterated.

def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T]

def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T]): DataStream[T]

Obtain the user-defined data collection as parallel data stream source.

data indicates the iterator that can be divided into multiple partitions.

def generateSequence(from: Long, to: Long): DataStream[Long]

Obtain a sequence of user-defined data as the data stream source.

  • from indicates the starting point of numbers.
  • to indicates the end point of numbers.

def readTextFile(filePath: String): DataStream[String]

Obtain the user-defined text file from a specific path as the data stream source.

  • filePath indicates the path of the text file.
  • charsetName indicates the encoding format.

def readTextFile(filePath: String, charsetName: String): DataStream[String]

def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String)

Obtain the user-defined file from a specific path as the data stream source.

  • filePath indicates the file path.
  • inputFormat indicates the format of the file.
  • watchType indicates the file processing mode, which can be PROCESS_ONCE or PROCESS_CONTINUOUSLY.
  • interval indicates the interval for processing directories or files.

def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String, watchType: FileProcessingMode, interval: Long): DataStream[T]

def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): DataStream[String]

Obtain user-defined socket data as the data stream source.

  • hostname indicates the host name of the socket server.
  • port indicates the listening port of the server.
  • delimiter and maxRetry are not supported by Scala APIs.

def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]

Customize the SourceFunction and addSource methods to add data sources such as Kafka. The implementation method is the run method of SourceFunction.

  • function indicates the user-defined SourceFunction function.
  • Simplified format is supported by Scala.

def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]

Data Output

Table 2 APIs about data output

API

Description

def print(): DataStreamSink[T]

Print data as the standard output stream.

def printToErr()

Print data as the standard error output stream.

def writeAsText(path: String): DataStreamSink[T]

Write data to a specific text file.

  • path indicates the path of the text file.
  • writeMode indicates the writing mode, which can be OVERWRITE or NO_OVERWRITE.

def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T]

def writeAsCsv(path: String): DataStreamSink[T]

Writhe data to a specific .csv file.

  • path indicates the path of the text file.
  • writeMode indicates the writing mode, which can be OVERWRITE or NO_OVERWRITE.
  • rowDelimiter indicates the row separator.
  • fieldDelimiter indicates the column separator.

def writeAsCsv(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T]

def writeAsCsv(path: String, writeMode: FileSystem.WriteMode, rowDelimiter: String, fieldDelimiter: String): DataStreamSink[T]

def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T]

Write data to a file, for example, a binary file.

def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T]): DataStreamSink[T]

Write data to the socket connection.

  • hostName indicates the host name.
  • port indicates the port number.

def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]

Export data in a user-defined manner. The flink-connectors allows addSink method to support exporting data to Kafka. The implementation method is the invoke method of SinkFunction.

def addSink(fun: T => Unit): DataStreamSink[T]

Filtering and Mapping

Table 3 APIs about filtering and mapping

API

Description

def map[R: TypeInformation](fun: T => R): DataStream[R]

Transform an element into another element of the same type.

def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]

def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R]

Transform an element into zero, one, or multiple elements.

def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]

def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]

def filter(filter: FilterFunction[T]): DataStream[T]

Run a Boolean function on each element and retain elements that return true.

def filter(fun: T => Boolean): DataStream[T]

Aggregation

Table 4 APIs about aggregation

API

Description

def keyBy(fields: Int*): KeyedStream[T, JavaTuple]

Logically divide a stream into multiple partitions, each containing elements with the same key. The partitioning is internally implemented using hash partition. A KeyedStream is returned.

Return the KeyedStream after the KeyBy operation, and call the KeyedStream function, such as reduce, fold, min, minby, max, maxby, sum, and sumby.

  • fields indicates the IDs of certain columns
  • firstField and otherFields are names of member variables.
  • key indicates the user-defined basis for partitioning.

def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]

def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]

def reduce(fun: (T, T) => T): DataStream[T]

Perform reduce on KeyedStream in a rolling manner. Aggregate the current element and the last reduced value into a new value. The types of the three values are the same.

def reduce(reducer: ReduceFunction[T]): DataStream[T]

def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R]

Perform fold operation on KeyedStream based on an initial value in a rolling manner. Aggregate the current element and the last folded value into a new value. The input and returned values of Fold can be different.

def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]): DataStream[R]

def sum(position: Int): DataStream[T]

Calculate the sum in a KeyedStream in a rolling manner.

position and field indicate calculating the sum of a specific column.

def sum(field: String): DataStream[T]

def min(position: Int): DataStream[T]

Calculate the minimum value in a KeyedStream in a rolling manner. min returns the minimum value, without guarantee of the correctness of columns of non-minimum values.

position and field indicate calculating the minimum value of a specific column.

def min(field: String): DataStream[T]

def max(position: Int): DataStream[T]

Calculate the maximum value in KeyedStream in a rolling manner. max returns the maximum value, without guarantee of the correctness of columns of non-maximum values.

position and field indicate calculating the maximum value of a specific column.

def max(field: String): DataStream[T]

def minBy(position: Int): DataStream[T]

Obtain the row where the minimum value of a column locates in a KeyedStream. minBy returns all elements of that row.

position and field indicate the column on which the minBy operation is performed.

def minBy(field: String): DataStream[T]

def maxBy(position: Int): DataStream[T]

Obtain the row where the maximum value of a column locates in a KeyedStream. maxBy returns all elements of that row.

position and field indicate the column on which the maxBy operation is performed.

def maxBy(field: String): DataStream[T]

DataStream Distribution

Table 5 APIs about DataStream distribution

API

Description

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T]

Use a user-defined partitioner to select target task for each element.

  • partitioner indicates the user-defined method for repartitioning.
  • field indicates the input parameters of partitioner.
  • keySelector indicates the user-defined input parameters of partitioner.

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String):DataStream[T]

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T]

def shuffle: DataStream[T]

Randomly and evenly partition elements.

def rebalance: DataStream[T]

Partition elements in round-robin manner, ensuring load balance of each partition. This partitioning is helpful to data with skewness.

def rescale: DataStream[T]

Distribute elements into downstream subset in round-robin manner.

NOTE:

The method for checking the code is similar to the rebalance method.

def broadcast: DataStream[T]

Broadcast each element to all partitions.

Configuring the eventtime Attribute

Table 6 APIs about configuring the eventtime attribute

API

Description

def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

Extract timestamp from records, enabling event time window to trigger computing.

def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]

Iteration

Table 7 APIs about iteration

API

Description

def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),maxWaitTimeMillis:Long = 0,keepPartitioning: Boolean = false) : DataStream[R]

In the flow, create in a feedback loop to redirect the output of an operator to a preceding operator.

NOTE:
  • This API is helpful to algorithms that require constant update of models.
  • long maxWaitTimeMillis: The timeout period of each round of iteration.

def iterate[R, F: TypeInformation](stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),maxWaitTimeMillis:Long): DataStream[R]

Stream Splitting

Table 8 APIs about stream splitting

API

Description

def split(selector: OutputSelector[T]): SplitStream[T]

Use OutputSelector to rewrite select method, specify stream splitting basis (by tagging), and construct SplitStream streams. That is, a string is tagged for each element, so that a stream of a specific tag can be selected or created.

def select(outputNames: String*): DataStream[T]

Select one or multiple streams from a SplitStream.

outputNames indicates the sequence of string tags created for each element using the split method.

Window

Windows can be classified as tumbling windows and sliding windows.

  • APIs such as Window, TimeWindow, CountWindow, WindowAll, TimeWindowAll, and CountWindowAll can be used to generate windows.
  • APIs such as Window Apply, Window Reduce, Window Fold, and Aggregations on windows can be used to operate windows.
  • Multiple Window Assigners are supported, including TumblingEventTimeWindows, TumblingProcessingTimeWindows, SlidingEventTimeWindows, SlidingProcessingTimeWindows, EventTimeSessionWindows, ProcessingTimeSessionWindows, and GlobalWindows.
  • ProcessingTime, EventTime, and IngestionTime are supported.
  • Timestamp modes EventTime: AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks are supported.

Table9 APIs for generating windows lists APIs for generating windows.

Table 9 APIs for generating windows

API

Description

def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]

Define windows in partitioned KeyedStreams. A window groups each key according to some characteristics (for example, data received within the latest 5 seconds. For details about windows, see:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html.

def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]

Define windows in DataStreams.

def timeWindow(size: time WindowedStream[T, K, TimeWindow]

Define time windows in partitioned KeyedStreams, select ProcessingTime or EventTime based on the environment.getStreamTimeCharacteristic() parameter, and determine whether the window is TumblingWindow or SlidingWindow depends on the number of parameters.

  • size indicates the duration of the window.
  • slide indicates the sliding time of window.
NOTE:
  • WindowedStream and AllWindowedStream indicates two types of streams.
  • If the API contains only one parameter, the window is TumblingWindow. If the API contains two or more parameters, the window is SlidingWindow.

def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]

def timeWindowAll(size: time AllWindowedStream[T, TimeWindow]

Define time windows in partitioned DataStream, select ProcessingTime or EventTime based on the environment.getStreamTimeCharacteristic() parameter, and determine whether the window is TumblingWindow or SlidingWindow depends on the number of parameters.

  • size indicates the duration of the window.
  • slide indicates the sliding time of window.

def timeWindowAll(size: Time, slide: time AllWindowedStream[T, TimeWindow]

def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]

Divide windows according to the number of elements and define windows in partitioned KeyedStreams.

  • size indicates the duration of the window.
  • slide indicates the sliding time of window.
NOTE:
  • WindowedStream and AllWindowedStream indicates two types of streams.
  • If the API contains only one parameter, the window is TumblingWindow. If the API contains two or more parameters, the window is SlidingWindow.

def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]

def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]

Divide windows according to the number of elements and define windows in DataStreams.

  • size indicates the duration of the window.
  • slide indicates the sliding time of window.

def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]

Table 10 lists APIs for operating windows.

Table 10 APIs for operating windows

Method

API

Description

Window

def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]

Apply a general function to a window. The data in the window is calculated as a whole.

function indicates the window function to be executed.

def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]

def reduce(function: ReduceFunction[T]): DataStream[T]

Apply a reduce function to the window and return the result.

  • reduceFunction indicates the reduce function to be executed.
  • function of WindowFunction indicates triggering an operation to the window after a reduce operation.

def reduce(function: (T, T) => T): DataStream[T]

def reduce[R: TypeInformation](preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R]

def reduce[R: TypeInformation](preAggregator: (T, T) => T, windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]

def fold[R: TypeInformation](initialValue: R, function: FoldFunction[T,R]): DataStream[R]

Apply a fold function to the window and return the result.

  • initialValue indicates the initial value.
  • foldFunction indicates the fold function.
  • function of WindowFunction indicates triggering an operation to the window after a fold operation.

def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R]

def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, foldFunction: FoldFunction[T, ACC], function: WindowFunction[ACC, R, K, W]): DataStream[R]

def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, foldFunction: (ACC, T) => ACC, windowFunction: (K, W, Iterable[ACC], Collector[R]) => Unit): DataStream[R]

WindowAll

def apply[R: TypeInformation](function: AllWindowFunction[T, R, W]): DataStream[R]

Apply a general function to a window. The data in the window is calculated as a whole.

def apply[R: TypeInformation](function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R]

def reduce(function: ReduceFunction[T]): DataStream[T]

Apply a reduce function to the window and return the result.

  • reduceFunction indicates the reduce function to be executed.
  • AllWindowFunction indicates triggering an operation to the window after a reduce operation.

def reduce(function: (T, T) => T): DataStream[T]

def reduce[R: TypeInformation](preAggregator: ReduceFunction[T], windowFunction: AllWindowFunction[T, R, W]): DataStream[R]

def reduce[R: TypeInformation](preAggregator: (T, T) => T, windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R]

def fold[R: TypeInformation](initialValue: R, function: FoldFunction[T,R]): DataStream[R]

Apply a fold function to the window and return the result.

  • initialValue indicates the initial value.
  • foldFunction indicates the fold function.
  • function of WindowFunction indicates triggering an operation to the window after a fold operation.

def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R]

def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, preAggregator: FoldFunction[T, ACC], windowFunction: AllWindowFunction[ACC, R, W]): DataStream[R]

def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, preAggregator: (ACC, T) => ACC, windowFunction: (W, Iterable[ACC], Collector[R]) => Unit): DataStream[R]

Window and WindowAll

def sum(position: Int): DataStream[T]

Sum a specified column of the window data.

field and position indicate a specific column of the data.

def sum(field: String): DataStream[T]

def min(position: Int): DataStream[T]

Calculate the minimum value of a specified column of the window data. min returns the minimum value, without guarantee of the correctness of columns of non-minimum values.

position and field indicate calculating the minimum value of a specific column.

def min(field: String): DataStream[T]

def max(position: Int): DataStream[T]

Calculate the maximum value of a specified column of the window data. max returns the maximum value, without guarantee of the correctness of columns of non-maximum values.

position and field indicate calculating the maximum value of a specific column.

def max(field: String): DataStream[T]

def minBy(position: Int): DataStream[T]

Obtain the row where the minimum value of a column locates in the window data. minBy returns all elements of that row.

position and field indicate the column on which the minBy operation is performed.

def minBy(field: String): DataStream[T]

def maxBy(position: Int): DataStream[T]

Obtain the row where the maximum value of a column locates in the window data. maxBy returns all elements of that row.

position and field indicate the column on which the maxBy operation is performed.

def maxBy(field: String): DataStream[T]

Combining Multiple DataStreams

Table 11 APIs about combining multiple DataStreams

API

Description

def union(dataStreams: DataStream[T]*): DataStream[T]

Perform union operation on multiple DataStreams to generate a new data stream containing all data from original DataStreams.

NOTE:

If you perform union operation on a piece of data with itself, there are two copies of the same data.

def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]

Connect two DataStreams and retain the original type. The connect API method allows two DataStreams to share statuses. After ConnectedStreams is generated, map or flatMap operation can be called.

def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R]

Perform mapping operation, which is similar to map and flatMap operation in a ConnectedStreams, on elements. After the operation, the type of the new DataStreams is string.

def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]

def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R]

Perform union operation on multiple DataStreams to generate a new data stream containing all data from original DataStreams.

NOTE:

If you perform union operation on a piece of data with itself, there are two copies of the same data.

def flatMap[R: TypeInformation](fun1: (IN1, Collector[R]) => Unit, fun2: (IN2, Collector[R]) => Unit): DataStream[R]

def flatMap[R: TypeInformation](fun1·IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R]

Join Operation

Table 12 APIs about join operation

API

Description

def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]

Join two DataStreams using a given key in a specified window.

The key value of the join operation is specified by the where and eaualTo method, indicating filtering data with equivalent conditions from two DataStreams.

def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]

Co-group two DataStreams using a given key in a specified window.

The key value of the coGroup operation is specified by the where and eaualTo method, indicating partitioning two DataStreams using equivalent conditions.