更新时间:2024-08-03 GMT+08:00

Flink Scala API接口介绍

由于Flink开源版本升级,为避免出现API兼容性或可靠性问题,建议用户使用配套版本的API。

Flink常用接口

Flink主要使用到如下这几个类:

  • StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。
  • DataStream:Flink用特别的类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。
  • KeyedStream:DataStream通过keyBy分组操作生成流,数据经过对设置的key值进行分组。
  • WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。
  • AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。
  • ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。
  • JoinedStreams:在窗口上对数据进行等值join操作,join操作是coGroup操作的一种特殊场景。
  • CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。
图1 Flink Stream的各种流类型转换

流数据输入

表1 流数据输入的相关接口

API

说明

def fromElements[T: TypeInformation](data: T*): DataStream[T]

获取用户定义的多个元素的数据,作为输入流数据。

data是多个元素的具体数据。

def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]

获取用户定义的集合数据,作为输入流数据。

data可以是集合数据或者可迭代的数据体。

def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T]

def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T]): DataStream[T]

获取用户定义的集合数据,作为输入并行流数据。

data是可被分割成多个partition的迭代数据体。

def generateSequence(from: Long, to: Long): DataStream[Long]

获取用户定义的一串序列数据,作为输入流数据。

  • from是指数值串的起点。
  • to是指数值串的终点。

def readTextFile(filePath: String): DataStream[String]

获取用户定义的某路径下的文本文件数据,作为输入流数据。

  • filePath是指文本文件的路径。
  • charsetName指的是编码格式的名字。

def readTextFile(filePath: String, charsetName: String): DataStream[String]

def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String)

获取用户定义的某路径下的文件数据,作为输入流数据。

  • filePath是指文件的路径。
  • inputFormat是指文件的格式。
  • watchType指的是文件的处理模式“PROCESS_ONCE”或者“PROCESS_CONTINUOUSLY”。
  • interval指的是多长时间判断目录或文件变化进行处理。

def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String, watchType: FileProcessingMode, interval: Long): DataStream[T]

def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): DataStream[String]

获取用户定义的Socket数据,作为输入流数据。

  • hostname是指Socket的服务器端的主机名称。
  • port指的是服务器的监测端口。
  • delimiter和maxRetry两个参数scala接口暂时不支持设置。

def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]

用户自定义SourceFunction,addSource方法可以添加Kafka等数据源,主要实现方法为SourceFunction的run。

  • function指的是用户自定义的SourceFunction函数。
  • scala支持简化写法。

def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]

数据输出

表2 数据输出的相关接口

API

说明

def print(): DataStreamSink[T]

数据输出以标准输出流打印出来。

def printToErr()

数据输出以标准error输出流打印出来。

def writeAsText(path: String): DataStreamSink[T]

数据输出写入到某个文本文件中。

  • path指的是文本文件的路径。
  • writeMode为文本文件写入模式“OVERWRITE”或者“NO_OVERWRITE”。

def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T]

def writeAsCsv(path: String): DataStreamSink[T]

数据输出写入到某个csv格式的文件中。

  • path指的是文本文件的路径。
  • writeMode为文本文件写入模式“OVERWRITE”或者“NO_OVERWRITE”。
  • rowDelimiter为行分隔符。
  • fieldDelimiter为列分隔符。

def writeAsCsv(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T]

def writeAsCsv(path: String, writeMode: FileSystem.WriteMode, rowDelimiter: String, fieldDelimiter: String): DataStreamSink[T]

def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T]

数据输出到普通文件中,例如二进制文件。

def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T]): DataStreamSink[T]

数据输出写入到Socket连接中。

  • hostName为主机名称。
  • port为端口。

def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]

用户自定义的数据输出,addSink方法通过flink-connectors支持数据输出到Kafka,主要实现方法为SinkFunction的invoke方法。

def addSink(fun: T => Unit): DataStreamSink[T]

过滤和映射能力

表3 过滤和映射能力的相关接口

API

说明

def map[R: TypeInformation](fun: T => R): DataStream[R]

输入一个元素,生成另一个元素,元素类型不变。

def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]

def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R]

输入一个元素,生成零个、一个或者多个元素。

def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]

def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]

def filter(filter: FilterFunction[T]): DataStream[T]

对每个元素执行一个布尔函数,只保留返回true的元素。

def filter(fun: T => Boolean): DataStream[T]

聚合能力

表4 聚合能力的相关接口

API

说明

def keyBy(fields: Int*): KeyedStream[T, JavaTuple]

将流逻辑分区成不相交的分区,每个分区包含相同key的元素。内部是用hash分区来实现的。这个转换返回了一个KeyedStream。

KeyBy操作之后返回KeyedStream,然后再调用KeyedStream的函数(例如reduce/fold/min/minby/max/maxby/sum/sumby等)进行相应操作。

  • fields为数据某几列的序号。
  • firstField和otherFields为数据结构的成员变量的名称。
  • key为用户自定义的指定分区依据的方法。

def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]

def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]

def reduce(fun: (T, T) => T): DataStream[T]

在一个KeyedStream上“滚动”reduce。合并当前元素与上一个被reduce的值,然后输出新的值。注意三者的类型是一致的。

def reduce(reducer: ReduceFunction[T]): DataStream[T]

def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R]

在一个KeyedStream上基于一个初始值“滚动”折叠。合并当前元素和上一个被折叠的值,然后输出新值。注意Fold的输入值与返回值类型可以不一致。

def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]): DataStream[R]

def sum(position: Int): DataStream[T]

在一个KeyedStream上滚动求和操作。

position和field代表对某一列求和。

def sum(field: String): DataStream[T]

def min(position: Int): DataStream[T]

在一个KeyedStream上滚动求最小值。min返回了最小值,不保证非最小值列的准确性。

position和field代表对某一列求最小值。

def min(field: String): DataStream[T]

def max(position: Int): DataStream[T]

在一个KeyedStream上滚动求最大值。max返回了最大值,不保证非最大值列的准确性。

position和field代表对某一列求最大值。

def max(field: String): DataStream[T]

def minBy(position: Int): DataStream[T]

在一个KeyedStream上求某一列最小值所在的该行数据,minBy返回了该行数据的所有元素。

position和field代表对某一列做minBy操作。

def minBy(field: String): DataStream[T]

def maxBy(position: Int): DataStream[T]

在一个KeyedStream上求某一列最大值所在的该行数据,maxBy返回了该行数据的所有元素。

position和field代表对某一列做maxBy操作。

def maxBy(field: String): DataStream[T]

数据流分发能力

表5 数据流分发能力的相关接口

API

说明

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T]

使用一个用户自定义的Partitioner对每一个元素选择目标task。

  • partitioner指的是用户自定义的分区类重写partition方法。
  • field指的是partitioner的输入参数。
  • keySelector指的是用户自定义的partitioner的输入参数。

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String):DataStream[T]

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T]

def shuffle: DataStream[T]

以均匀分布的形式将元素随机地进行分区。

def rebalance: DataStream[T]

基于round-robin对元素进行分区,使得每个分区负责均衡。对于存在数据倾斜的性能优化是很有用的。

def rescale: DataStream[T]

以round-robin的形式将元素分区到下游操作的子集中。

说明:

查看代码和rebalance的方式是一样的。

def broadcast: DataStream[T]

广播每个元素到所有分区。

提供设置eventtime属性的能力

表6 提供设置eventtime属性的能力的相关接口

API

说明

def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

为了能让event time窗口可以正常触发窗口计算操作,需要从记录中提取时间戳。

def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]

提供迭代的能力

表7 提供迭代的能力的相关接口

API

说明

def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),maxWaitTimeMillis:Long = 0,keepPartitioning: Boolean = false) : DataStream[R]

在流(flow)中创建一个带反馈的循环,通过重定向一个operator的输出到之前的operator。

说明:
  • 对于定义一些需要不断更新模型的算法是非常有帮助的。
  • long maxWaitTimeMillis:该超时时间指的是每一轮迭代体执行的超时时间。

def iterate[R, F: TypeInformation](stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),maxWaitTimeMillis:Long): DataStream[R]

提供分流能力

表8 提供分流能力的相关接口

API

说明

def split(selector: OutputSelector[T]): SplitStream[T]

传入OutputSelector,重写select方法确定分流的依据(即打标记),构建SplitStream流。即对每个元素做一个字符串的标记,作为选择的依据,打好标记之后就可以通过标记选出并新建某个标记的流。

def select(outputNames: String*): DataStream[T]

从一个SplitStream中选出一个或多个流。

outputNames指的是使用split方法对每个元素做的字符串标记的序列。

窗口能力

窗口分为跳跃窗口和滑动窗口。

  • 支持Window、TimeWindow、CountWindow以及WindowAll、TimeWindowAll、CountWindowAll API窗口生成。
  • 支持Window Apply、Window Reduce、Window Fold、Aggregations on windows API窗口操作。
  • 支持多种Window Assigner(TumblingEventTimeWindows、TumblingProcessingTimeWindows、SlidingEventTimeWindows、SlidingProcessingTimeWindows、EventTimeSessionWindows、ProcessingTimeSessionWindows、GlobalWindows)。
  • 支持三种时间ProcessingTime、EventTime和IngestionTime。
  • 支持两种EventTime时间戳方式:AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks。

窗口生成类API如表9所示。

表9 窗口生成类的相关接口

API

说明

def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]

窗口可以被定义在已经被分区的KeyedStreams上。窗口会对数据的每一个key根据一些特征(例如在最近5秒钟内到达的数据)进行分组。

def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]

窗口可以被定义在DataStream上。

def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]

时间窗口定义在已经被分区的KeyedStreams上,根据environment.getStreamTimeCharacteristic()参数选择是ProcessingTime还是EventTime,根据参数个数确定是TumblingWindow还是SlidingWindow。

  • size指的是窗口时间的大小。
  • slide指的是窗口的滑动时间。
说明:
  • WindowedStream和AllWindowedStream代表不同的两种流。
  • 接口中只有一个参数则是TumblingWindow;有两个或两个以上的参数则是SlidingWindow。

def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]

def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]

时间窗口定义在DataStream上,根据environment.getStreamTimeCharacteristic()参数选择是ProcessingTime还是EventTime,根据参数个数确定是TumblingWindow还是SlidingWindow。

  • size指的是窗口时间的大小。
  • slide指的是窗口的滑动时间。

def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]

def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]

按照元素个数区分窗口,定义在已经被分区的KeyedStreams上。

  • size指的是窗口时间的大小。
  • slide指的是窗口的滑动时间。
说明:
  • WindowedStream和AllWindowedStream代表不同的两种流。
  • 接口中只有一个参数则是TumblingWindow;有两个或两个以上的参数则是SlidingWindow。

def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]

def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]

按照元素个数区分窗口,定义在DataStream上。

  • size指的是窗口时间的大小。
  • slide指的是窗口的滑动时间。

def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]

窗口操作类API如表10所示。

表10 窗口操作类的相关接口

方法

API

说明

Window

def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]

应用一个一般的函数到窗口上,窗口中的数据会作为一个整体被计算。

function指的是执行的窗口函数

def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]

def reduce(function: ReduceFunction[T]): DataStream[T]

应用一个reduce函数到窗口上,返回reduce后的值。

  • reduceFunction指的是执行的reduce函数。
  • WindowFunction的function指的是在经过reduce操作之后再触发一次窗口操作。

def reduce(function: (T, T) => T): DataStream[T]

def reduce[R: TypeInformation](preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R]

def reduce[R: TypeInformation](preAggregator: (T, T) => T, windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]

def fold[R: TypeInformation](initialValue: R, function: FoldFunction[T,R]): DataStream[R]

应用一个fold函数到窗口上,然后返回折叠后的值。

  • initialValue指的是初始值。
  • foldFunction指的是折叠函数。
  • WindowFunction的function指的是在经过fold操作之后再触发一次窗口操作。

def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R]

def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, foldFunction: FoldFunction[T, ACC], function: WindowFunction[ACC, R, K, W]): DataStream[R]

def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, foldFunction: (ACC, T) => ACC, windowFunction: (K, W, Iterable[ACC], Collector[R]) => Unit): DataStream[R]

WindowAll

def apply[R: TypeInformation](function: AllWindowFunction[T, R, W]): DataStream[R]

应用一个一般的函数到窗口上,窗口中的数据会作为一个整体被计算。

def apply[R: TypeInformation](function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R]

def reduce(function: ReduceFunction[T]): DataStream[T]

应用一个reduce函数到窗口上,返回reduce后的值。

  • reduceFunction指的是执行的reduce函数。
  • AllWindowFunction的function指的是在经过reduce操作之后再触发一次窗口操作。

def reduce(function: (T, T) => T): DataStream[T]

def reduce[R: TypeInformation](preAggregator: ReduceFunction[T], windowFunction: AllWindowFunction[T, R, W]): DataStream[R]

def reduce[R: TypeInformation](preAggregator: (T, T) => T, windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R]

def fold[R: TypeInformation](initialValue: R, function: FoldFunction[T,R]): DataStream[R]

应用一个fold函数到窗口上,然后返回折叠后的值。

  • initialValue指的是初始值。
  • foldFunction指的是折叠函数。
  • WindowFunction的function指的是在经过fold操作之后再触发一次窗口操作。

def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R]

def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, preAggregator: FoldFunction[T, ACC], windowFunction: AllWindowFunction[ACC, R, W]): DataStream[R]

def fold[ACC: TypeInformation, R: TypeInformation](initialValue: ACC, preAggregator: (ACC, T) => ACC, windowFunction: (W, Iterable[ACC], Collector[R]) => Unit): DataStream[R]

Window和WindowAll

def sum(position: Int): DataStream[T]

对窗口数据的某一列求和。

position和field代表数据的某一列。

def sum(field: String): DataStream[T]

def min(position: Int): DataStream[T]

对窗口数据的某一列求最小值。min返回了最小值,不保证非最小值列的准确性。

position和field代表对某一列求最小值。

def min(field: String): DataStream[T]

def max(position: Int): DataStream[T]

对窗口数据的某一列求最大值。max返回了最大值,不保证非最大值列的准确性。

position和field代表对某一列求最大值。

def max(field: String): DataStream[T]

def minBy(position: Int): DataStream[T]

对窗口数据的某一列求最小值所在的该行数据,minBy返回了该行数据的所有元素。

position和field代表对某一列做minBy操作。

def minBy(field: String): DataStream[T]

def maxBy(position: Int): DataStream[T]

对窗口数据的某一列求最大值所在的该行数据,maxBy返回了该行数据的所有元素。

position和field代表对某一列做maxBy操作。

def maxBy(field: String): DataStream[T]

提供多流合并的能力

表11 提供多流合并的能力的相关接口

API

说明

def union(dataStreams: DataStream[T]*): DataStream[T]

Union两个或多个数据流,生成一个新的包含了来自所有流的所有数据的数据流。

说明:

如果你将一个数据流与其自身进行了合并,在结果流中对于每个元素你都会拿到两份。

def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]

“连接”两个数据流并保持原先的类型。Connect可以让两条流之间共享状态。产生ConnectedStreams之后,调用map或者flatmap进行操作计算。

def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R]

在ConnectedStreams上做元素映射,类似DataStream的map操作,元素映射之后流数据类型统一。

def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]

def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R]

在ConnectedStreams上做元素映射,类似DataStream的flatMap操作,元素映射之后流数据类型统一。

def flatMap[R: TypeInformation](fun1: (IN1, Collector[R]) => Unit, fun2: (IN2, Collector[R]) => Unit): DataStream[R]

def flatMap[R: TypeInformation](fun1·IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R]

提供Join能力

表12 提供Join能力的相关接口

API

说明

def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]

通过给定的key在一个窗口范围内join两条数据流。

join操作的key值通过where和eaualTo方法进行指定,代表两条流过滤出包含等值条件的数据。

def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]

通过给定的key在一个窗口范围内co-group两条数据流。

coGroup操作的key值通过where和eaualTo方法进行指定,代表两条流通过该等值条件进行分区处理。