更新时间:2024-10-23 GMT+08:00

structured streaming功能与可靠性介绍

Structured Streaming支持的功能

  1. 支持对流式数据的ETL操作。
  2. 支持流式DataFrames或Datasets的schema推断和分区。
  3. 流式DataFrames或Datasets上的操作:包括无类型,类似SQL的操作(比如select、where、groupBy),以及有类型的RDD操作(比如map、filter、flatMap)。
  4. 支持基于Event Time的聚合计算,支持对迟到数据的处理。
  5. 支持对流式数据的去除重复数据操作。
  6. 支持状态计算。
  7. 支持对流处理任务的监控。
  8. 支持批流join,流流join。

    当前JOIN操作支持列表如下:

    左表

    右表

    支持的Join类型

    说明

    Static

    Static

    全部类型

    即使在流处理中,不涉及流数据的join操作也能全部支持

    Stream

    Static

    Inner

    支持,但是无状态

    Left Outer

    支持,但是无状态

    Right Outer

    不支持

    Full Outer

    不支持

    Stream

    Stream

    Inner

    支持,左右表可选择使用watermark或者时间范围进行状态清理

    Left Outer

    有条件的支持,左表可选择使用watermark进行状态清理,右表必须使用watermark+时间范围

    Right Outer

    有条件的支持,右表可选择使用watermark进行状态清理,左表必须使用watermark+时间范围

    Full Outer

    不支持

Structured Streaming不支持的功能

  1. 不支持多个流聚合。
  2. 不支持limit、first、take这些取N条Row的操作。
  3. 不支持Distinct。
  4. 只有当output mode为complete时才支持排序操作。
  5. 有条件地支持流和静态数据集之间的外连接。
  6. 不支持部分DataSet上立即运行查询并返回结果的操作:
    • count():无法从流式Dataset返回单个计数,而是使用ds.groupBy().count()返回一个包含运行计数的streaming Dataset。
    • foreach():使用ds.writeStream.foreach(...)代替。
    • show():使用输出console sink代替。

Structured Streaming可靠性说明

Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。

  1. 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。

    从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下:

    1. 不允许source的个数或者类型发生变化。
    2. source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如:
      • 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
      • 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic")
    3. sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如:
      • File sink允许变更为kafka sink,kafka中只处理新数据。
      • kafka sink不允许变更为file sink。
      • kafka sink允许变更为foreach sink,反之亦然。
    4. sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如:
      • 不允许file sink的输出路径发生变更。
      • 允许Kafka sink的输出topic发生变更。
      • 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。
    5. Projection、filter和map-like操作变更,局部场景下能够支持,例如:
      • 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...)
      • Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream
      • Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。
    6. 状态操作的变更,在部分场景下会导致状态恢复失败:
      • Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。
      • Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。
      • Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。
      • 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。
  1. Source的容错性支持列表

    Sources

    支持的Options

    容错支持

    说明

    File source

    path:必填,文件路径

    maxFilesPerTrigger:每次trigger最大文件数(默认无限大)

    latestFirst:是否有限处理新文件(默认值: false)

    fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false)

    支持

    支持通配符路径,但不支持以逗号分隔的多个路径。

    文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。

    Socket Source

    host:连接的节点ip,必填

    port:连接的端口,必填

    不支持

    -

    Rate Source

    rowsPerSecond:每秒产生的行数,默认值1

    rampUpTime:在达到rowsPerSecond速度之前的上升时间

    numPartitions:生成数据行的并行度

    支持

    -

    Kafka Source

    参见https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html

    支持

    -

  2. Sink的容错性支持列表

    Sinks

    支持的output模式

    支持Options

    容错性

    说明

    File Sink

    Append

    Path:必须指定

    指定的文件格式,参见DataFrameWriter中的相关接口

    exactly-once

    支持写入分区表,按时间分区用处较大

    Kafka Sink

    Append, Update, Complete

    参见:https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html

    at-least-once

    参见https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html

    Foreach Sink

    Append, Update, Complete

    None

    依赖于ForeachWriter实现

    参见https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-programming-guide.html#using-foreach

    ForeachBatch Sink

    Append, Update, Complete

    None

    依赖于算子实现

    参见https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

    Console Sink

    Append, Update, Complete

    numRows:每轮打印的行数,默认20

    truncate:输出太长时是否清空,默认true

    不支持容错

    -

    Memory Sink

    Append, Complete

    None

    不支持容错,在complete模式下,重启query会重建整个表

    -