structured streaming功能与可靠性介绍
Structured Streaming支持的功能
- 支持对流式数据的ETL操作。
- 支持流式DataFrames或Datasets的schema推断和分区。
- 流式DataFrames或Datasets上的操作:包括无类型,类似SQL的操作(比如select、where、groupBy),以及有类型的RDD操作(比如map、filter、flatMap)。
- 支持基于Event Time的聚合计算,支持对迟到数据的处理。
- 支持对流式数据的去除重复数据操作。
- 支持状态计算。
- 支持对流处理任务的监控。
- 支持批流join,流流join。
左表
右表
支持的Join类型
说明
Static
Static
全部类型
即使在流处理中,不涉及流数据的join操作也能全部支持
Stream
Static
Inner
支持,但是无状态
Left Outer
支持,但是无状态
Right Outer
不支持
Full Outer
不支持
Stream
Stream
Inner
支持,左右表可选择使用watermark或者时间范围进行状态清理
Left Outer
有条件的支持,左表可选择使用watermark进行状态清理,右表必须使用watermark+时间范围
Right Outer
有条件的支持,右表可选择使用watermark进行状态清理,左表必须使用watermark+时间范围
Full Outer
不支持
Structured Streaming不支持的功能
- 不支持多个流聚合。
- 不支持limit、first、take这些取N条Row的操作。
- 不支持Distinct。
- 只有当output mode为complete时才支持排序操作。
- 有条件地支持流和静态数据集之间的外连接。
- 不支持部分DataSet上立即运行查询并返回结果的操作:
- count():无法从流式Dataset返回单个计数,而是使用ds.groupBy().count()返回一个包含运行计数的streaming Dataset。
- foreach():使用ds.writeStream.foreach(...)代替。
- show():使用输出console sink代替。
Structured Streaming可靠性说明
Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。
- 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。
从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下:
- 不允许source的个数或者类型发生变化。
- source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如:
- 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
- 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic")
- sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如:
- File sink允许变更为kafka sink,kafka中只处理新数据。
- kafka sink不允许变更为file sink。
- kafka sink允许变更为foreach sink,反之亦然。
- sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如:
- 不允许file sink的输出路径发生变更。
- 允许Kafka sink的输出topic发生变更。
- 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。
- Projection、filter和map-like操作变更,局部场景下能够支持,例如:
- 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...)
- Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream
- Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。
- 状态操作的变更,在部分场景下会导致状态恢复失败:
- Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。
- Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。
- Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。
- 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。
- Source的容错性支持列表
Sources
支持的Options
容错支持
说明
File source
path:必填,文件路径
maxFilesPerTrigger:每次trigger最大文件数(默认无限大)
latestFirst:是否有限处理新文件(默认值: false)
fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false)
支持
支持通配符路径,但不支持以逗号分隔的多个路径。
文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。
Socket Source
host:连接的节点ip,必填
port:连接的端口,必填
不支持
-
Rate Source
rowsPerSecond:每秒产生的行数,默认值1
rampUpTime:在达到rowsPerSecond速度之前的上升时间
numPartitions:生成数据行的并行度
支持
-
Kafka Source
参见https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html
支持
-
- Sink的容错性支持列表
Sinks
支持的output模式
支持Options
容错性
说明
File Sink
Append
Path:必须指定
指定的文件格式,参见DataFrameWriter中的相关接口
exactly-once
支持写入分区表,按时间分区用处较大
Kafka Sink
Append, Update, Complete
参见:https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html
at-least-once
参见https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html
Foreach Sink
Append, Update, Complete
None
依赖于ForeachWriter实现
ForeachBatch Sink
Append, Update, Complete
None
依赖于算子实现
Console Sink
Append, Update, Complete
numRows:每轮打印的行数,默认20
truncate:输出太长时是否清空,默认true
不支持容错
-
Memory Sink
Append, Complete
None
不支持容错,在complete模式下,重启query会重建整个表
-