Flink Scala API接口介绍
由于Flink开源版本升级,为避免出现API兼容性或可靠性问题,建议用户使用配套版本的API。
Flink常用接口
Flink主要使用到如下这几个类:
- StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。
- DataStream:Flink用特别的类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。
- KeyedStream:DataStream通过keyBy分组操作生成流,数据经过对设置的key值进行分组。
- WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。
- AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。
- ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。
- JoinedStreams:在窗口上对数据进行等值join操作,join操作是coGroup操作的一种特殊场景。
- CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。
流数据输入
API |
说明 |
---|---|
def fromElements[T: TypeInformation](data: T*): DataStream[T] |
获取用户定义的多个元素的数据,作为输入流数据。 data是多个元素的具体数据。 |
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] |
获取用户定义的集合数据,作为输入流数据。 data可以是集合数据或者可迭代的数据体。 |
def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T] |
|
def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T]): DataStream[T] |
获取用户定义的集合数据,作为输入并行流数据。 data是可被分割成多个partition的迭代数据体。 |
def generateSequence(from: Long, to: Long): DataStream[Long] |
获取用户定义的一串序列数据,作为输入流数据。
|
def readTextFile(filePath: String): DataStream[String] |
获取用户定义的某路径下的文本文件数据,作为输入流数据。
|
def readTextFile(filePath: String, charsetName: String): DataStream[String] |
|
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String) |
获取用户定义的某路径下的文件数据,作为输入流数据。
|
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String, watchType: FileProcessingMode, interval: Long): DataStream[T] |
|
def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): DataStream[String] |
获取用户定义的Socket数据,作为输入流数据。
|
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] |
用户自定义SourceFunction,addSource方法可以添加Kafka等数据源,主要实现方法为SourceFunction的run。
|
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] |
数据输出
API |
说明 |
---|---|
def print(): DataStreamSink[T] |
数据输出以标准输出流打印出来。 |
def printToErr() |
数据输出以标准error输出流打印出来。 |
def writeAsText(path: String): DataStreamSink[T] |
数据输出写入到某个文本文件中。
|
def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T] |
|
def writeAsCsv(path: String): DataStreamSink[T] |
数据输出写入到某个csv格式的文件中。
|
def writeAsCsv(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T] |
|
def writeAsCsv(path: String, writeMode: FileSystem.WriteMode, rowDelimiter: String, fieldDelimiter: String): DataStreamSink[T] |
|
def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T] |
数据输出到普通文件中,例如二进制文件。 |
def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T]): DataStreamSink[T] |
数据输出写入到Socket连接中。
|
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] |
用户自定义的数据输出,addSink方法通过flink-connectors支持数据输出到Kafka,主要实现方法为SinkFunction的invoke方法。 |
def addSink(fun: T => Unit): DataStreamSink[T] |
过滤和映射能力
API |
说明 |
---|---|
def map[R: TypeInformation](fun: T => R): DataStream[R] |
输入一个元素,生成另一个元素,元素类型不变。 |
def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R] |
|
def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R] |
输入一个元素,生成零个、一个或者多个元素。 |
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R] |
|
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] |
|
def filter(filter: FilterFunction[T]): DataStream[T] |
对每个元素执行一个布尔函数,只保留返回true的元素。 |
def filter(fun: T => Boolean): DataStream[T] |
聚合能力
API |
说明 |
---|---|
def keyBy(fields: Int*): KeyedStream[T, JavaTuple] |
将流逻辑分区成不相交的分区,每个分区包含相同key的元素。内部是用hash分区来实现的。这个转换返回了一个KeyedStream。 KeyBy操作之后返回KeyedStream,然后再调用KeyedStream的函数(例如reduce/fold/min/minby/max/maxby/sum/sumby等)进行相应操作。
|
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] |
|
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] |
|
def reduce(fun: (T, T) => T): DataStream[T] |
在一个KeyedStream上“滚动”reduce。合并当前元素与上一个被reduce的值,然后输出新的值。注意三者的类型是一致的。 |
def reduce(reducer: ReduceFunction[T]): DataStream[T] |
|
def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R] |
在一个KeyedStream上基于一个初始值“滚动”折叠。合并当前元素和上一个被折叠的值,然后输出新值。注意Fold的输入值与返回值类型可以不一致。 |
def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]): DataStream[R] |
|
def sum(position: Int): DataStream[T] |
在一个KeyedStream上滚动求和操作。 position和field代表对某一列求和。 |
def sum(field: String): DataStream[T] |
|
def min(position: Int): DataStream[T] |
在一个KeyedStream上滚动求最小值。min返回了最小值,不保证非最小值列的准确性。 position和field代表对某一列求最小值。 |
def min(field: String): DataStream[T] |
|
def max(position: Int): DataStream[T] |
在一个KeyedStream上滚动求最大值。max返回了最大值,不保证非最大值列的准确性。 position和field代表对某一列求最大值。 |
def max(field: String): DataStream[T] |
|
def minBy(position: Int): DataStream[T] |
在一个KeyedStream上求某一列最小值所在的该行数据,minBy返回了该行数据的所有元素。 position和field代表对某一列做minBy操作。 |
def minBy(field: String): DataStream[T] |
|
def maxBy(position: Int): DataStream[T] |
在一个KeyedStream上求某一列最大值所在的该行数据,maxBy返回了该行数据的所有元素。 position和field代表对某一列做maxBy操作。 |
def maxBy(field: String): DataStream[T] |
数据流分发能力
API |
说明 |
---|---|
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] |
使用一个用户自定义的Partitioner对每一个元素选择目标task。
|
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String):DataStream[T] |
|
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T] |
|
def shuffle: DataStream[T] |
以均匀分布的形式将元素随机地进行分区。 |
def rebalance: DataStream[T] |
基于round-robin对元素进行分区,使得每个分区负责均衡。对于存在数据倾斜的性能优化是很有用的。 |
def rescale: DataStream[T] |
以round-robin的形式将元素分区到下游操作的子集中。
说明:
查看代码和rebalance的方式是一样的。 |
def broadcast: DataStream[T] |
广播每个元素到所有分区。 |
提供设置eventtime属性的能力
API |
说明 |
---|---|
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] |
为了能让event time窗口可以正常触发窗口计算操作,需要从记录中提取时间戳。 |
def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T] |
提供迭代的能力
API |
说明 |
---|---|
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),maxWaitTimeMillis:Long = 0,keepPartitioning: Boolean = false) : DataStream[R] |
在流(flow)中创建一个带反馈的循环,通过重定向一个operator的输出到之前的operator。
说明:
|
def iterate[R, F: TypeInformation](stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),maxWaitTimeMillis:Long): DataStream[R] |
提供分流能力
API |
说明 |
---|---|
def split(selector: OutputSelector[T]): SplitStream[T] |
传入OutputSelector,重写select方法确定分流的依据(即打标记),构建SplitStream流。即对每个元素做一个字符串的标记,作为选择的依据,打好标记之后就可以通过标记选出并新建某个标记的流。 |
def select(outputNames: String*): DataStream[T] |
从一个SplitStream中选出一个或多个流。 outputNames指的是使用split方法对每个元素做的字符串标记的序列。 |
窗口能力
窗口分为跳跃窗口和滑动窗口。
- 支持Window、TimeWindow、CountWindow以及WindowAll、TimeWindowAll、CountWindowAll API窗口生成。
- 支持Window Apply、Window Reduce、Window Fold、Aggregations on windows API窗口操作。
- 支持多种Window Assigner(TumblingEventTimeWindows、TumblingProcessingTimeWindows、SlidingEventTimeWindows、SlidingProcessingTimeWindows、EventTimeSessionWindows、ProcessingTimeSessionWindows、GlobalWindows)。
- 支持三种时间ProcessingTime、EventTime和IngestionTime。
- 支持两种EventTime时间戳方式:AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks。
窗口生成类API如表9所示。
API |
说明 |
---|---|
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] |
窗口可以被定义在已经被分区的KeyedStreams上。窗口会对数据的每一个key根据一些特征(例如在最近5秒钟内到达的数据)进行分组。 |
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] |
窗口可以被定义在DataStream上。 |
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] |
时间窗口定义在已经被分区的KeyedStreams上,根据environment.getStreamTimeCharacteristic()参数选择是ProcessingTime还是EventTime,根据参数个数确定是TumblingWindow还是SlidingWindow。
说明:
|
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] |
|
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow] |
时间窗口定义在DataStream上,根据environment.getStreamTimeCharacteristic()参数选择是ProcessingTime还是EventTime,根据参数个数确定是TumblingWindow还是SlidingWindow。
|
def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow] |
|
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] |
按照元素个数区分窗口,定义在已经被分区的KeyedStreams上。
说明:
|
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow] |
|
def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] |
按照元素个数区分窗口,定义在DataStream上。
|
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] |
窗口操作类API如表10所示。
方法 |
API |
说明 |
---|---|---|
Window |
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R] |
应用一个一般的函数到窗口上,窗口中的数据会作为一个整体被计算。 function指的是执行的窗口函数。 |
def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] |
||
def reduce(function: ReduceFunction[T]): DataStream[T] |
应用一个reduce函数到窗口上,返回reduce后的值。
|
|
def reduce(function: (T, T) => T): DataStream[T] |
||
def reduce[R: TypeInformation](preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R] |
||
def reduce[R: TypeInformation](preAggregator: (T, T) => T, windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] |
||
def fold[R: TypeInformation](initialValue: R, function: FoldFunction[T,R]): DataStream[R] |
应用一个fold函数到窗口上,然后返回折叠后的值。
|
|
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] |
||
def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, foldFunction: FoldFunction[T, ACC], function: WindowFunction[ACC, R, K, W]): DataStream[R] |
||
def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, foldFunction: (ACC, T) => ACC, windowFunction: (K, W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] |
||
WindowAll |
def apply[R: TypeInformation](function: AllWindowFunction[T, R, W]): DataStream[R] |
应用一个一般的函数到窗口上,窗口中的数据会作为一个整体被计算。 |
def apply[R: TypeInformation](function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] |
||
def reduce(function: ReduceFunction[T]): DataStream[T] |
应用一个reduce函数到窗口上,返回reduce后的值。
|
|
def reduce(function: (T, T) => T): DataStream[T] |
||
def reduce[R: TypeInformation](preAggregator: ReduceFunction[T], windowFunction: AllWindowFunction[T, R, W]): DataStream[R] |
||
def reduce[R: TypeInformation](preAggregator: (T, T) => T, windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] |
||
def fold[R: TypeInformation](initialValue: R, function: FoldFunction[T,R]): DataStream[R] |
应用一个fold函数到窗口上,然后返回折叠后的值。
|
|
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] |
||
def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, preAggregator: FoldFunction[T, ACC], windowFunction: AllWindowFunction[ACC, R, W]): DataStream[R] |
||
def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, preAggregator: (ACC, T) => ACC, windowFunction: (W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] |
||
Window和WindowAll |
def sum(position: Int): DataStream[T] |
对窗口数据的某一列求和。 position和field代表数据的某一列。 |
def sum(field: String): DataStream[T] |
||
def min(position: Int): DataStream[T] |
对窗口数据的某一列求最小值。min返回了最小值,不保证非最小值列的准确性。 position和field代表对某一列求最小值。 |
|
def min(field: String): DataStream[T] |
||
def max(position: Int): DataStream[T] |
对窗口数据的某一列求最大值。max返回了最大值,不保证非最大值列的准确性。 position和field代表对某一列求最大值。 |
|
def max(field: String): DataStream[T] |
||
def minBy(position: Int): DataStream[T] |
对窗口数据的某一列求最小值所在的该行数据,minBy返回了该行数据的所有元素。 position和field代表对某一列做minBy操作。 |
|
def minBy(field: String): DataStream[T] |
||
def maxBy(position: Int): DataStream[T] |
对窗口数据的某一列求最大值所在的该行数据,maxBy返回了该行数据的所有元素。 position和field代表对某一列做maxBy操作。 |
|
def maxBy(field: String): DataStream[T] |
提供多流合并的能力
API |
说明 |
---|---|
def union(dataStreams: DataStream[T]*): DataStream[T] |
Union两个或多个数据流,生成一个新的包含了来自所有流的所有数据的数据流。
说明:
如果你将一个数据流与其自身进行了合并,在结果流中对于每个元素你都会拿到两份。 |
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] |
“连接”两个数据流并保持原先的类型。Connect可以让两条流之间共享状态。产生ConnectedStreams之后,调用map或者flatmap进行操作计算。 |
def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R] |
在ConnectedStreams上做元素映射,类似DataStream的map操作,元素映射之后流数据类型统一。 |
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R] |
|
def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R] |
在ConnectedStreams上做元素映射,类似DataStream的flatMap操作,元素映射之后流数据类型统一。 |
def flatMap[R: TypeInformation](fun1: (IN1, Collector[R]) => Unit, fun2: (IN2, Collector[R]) => Unit): DataStream[R] |
|
def flatMap[R: TypeInformation](fun1·IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R] |
提供Join能力
API |
说明 |
---|---|
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2] |
通过给定的key在一个窗口范围内join两条数据流。 join操作的key值通过where和eaualTo方法进行指定,代表两条流过滤出包含等值条件的数据。 |
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2] |
通过给定的key在一个窗口范围内co-group两条数据流。 coGroup操作的key值通过where和eaualTo方法进行指定,代表两条流通过该等值条件进行分区处理。 |