更新时间:2024-11-08 GMT+08:00

Debezium Format

功能描述

Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。

Flink 支持将 Debezium JSON 和 Avro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如:

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 或 Avro 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。

更多具体使用可参考开源社区文档:Debezium Format

支持的Connector

  • Kafka
  • Filesystem

注意事项

  • 重复的变更事件

    在正常的操作环境下,Debezium 应用能以exactly-once的语义投递每条变更事件。在这种情况下,Flink 消费 Debezium 产生的变更事件能够工作得很好。 单当发生故障时,Debezium应用只能保证at-least-once的投递语义。即在非正常情况下,Debezium可能会投递重复的变更事件到Kafka中,当Flink从Kafka中消费的时候就会得到重复的事件。 这可能会导致Flink query的运行得到错误的结果或者非预期的异常。

    解决方案:将作业参数 table.exec.source.cdc-events-duplicate 设置成true,并在该source上定义PRIMARY KEY。

    框架会生成一个额外的有状态算子,使用该primary key来对变更事件去重并生成一个规范化的changelog流。

    更新信息请参考Debezium 官方文档

  • 消费Debezium Postgres Connector产生的数据

    如果你正在使用Debezium PostgreSQL Connector捕获变更到 Kafka,请确保被监控表的REPLICA IDENTITY 已经被配置成FULL ,默认值是DEFAULT。 否则,Flink SQL将无法正确解析Debezium数据。

    当配置为 FULL 时,更新和删除事件将完整包含所有列的之前的值。

    当为其他配置时,更新和删除事件的“before”字段将只包含primary key字段的值,或者为 null(没有 primary key)。

    您可以通过运行 ALTER TABLE <your-table-name> REPLICA IDENTITY FULL 来更改 REPLICA IDENTITY 的配置。

参数说明

Flink 提供了 debezium-avro-confluent 和 debezium-json 两种 format 来解析 Debezium 生成的 JSON 格式和 Avro 格式的消息。 请使用 debezium-avro-confluent 来解析 Debezium 的 Avro 消息,使用 debezium-json 来解析 Debezium 的 JSON 消息。

表1 Debezium Avro参数说明

参数

是否必选

默认值

类型

说明

format

(none)

String

指定使用格式,此处使用'debezium-avro-confluent'。

debezium-avro-confluent.basic-auth.credentials-source

(none)

String

Schema Registry的基本身份验证凭据源。

debezium-avro-confluent.basic-auth.user-info

(none)

String

Schema Registry的基本身份验证用户信息。

debezium-avro-confluent.bearer-auth.credentials-source

(none)

String

Schema Registry的承载身份验证凭据源。

debezium-avro-confluent.bearer-auth.token

(none)

String

Schema Registry的承载身份验证Token。

debezium-avro-confluent.properties

(none)

Map

转发到底层Schema Registry的属性Map。这对于没有通过Flink显示配置的配置项非常有用。但是,请注意,Flink配置项具有更高的优先级。

debezium-avro-confluent.ssl.keystore.location

(none)

String

SSL keystore的位置/文件。

debezium-avro-confluent.ssl.keystore.password

(none)

String

SSL keystore的密码。

debezium-avro-confluent.ssl.truststore.location

(none)

String

SSL truststore的位置/文件。

debezium-avro-confluent.ssl.truststore.password

(none)

String

SSL truststore的密码。

debezium-avro-confluent.subject

(none)

String

用于在序列化期间此格式使用的注册schema的Confluent Schema Registry主题。默认情况下,'kafka'和'upsert-kafka'连接器使用'<topic_name>-value'或'<topic_name>-key'作为默认主题名称,如果此格式用作键或值的格式。但是对于其他连接器(例如'filesystem'),在用作sink时需要使用主题选项。

debezium-avro-confluent.url

(none)

String

用于获取/注册架构的Confluent Schema Registry的URL。

表2 Debezium Json参数说明

参数

是否必选

默认值

是否必选

描述

format

(none)

String

指定要使用的格式,此处应为 'debezium-json'。

debezium-json.schema-include

false

Boolean

设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。

debezium-json.ignore-parse-errors

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。

debezium-json.timestamp-format.standard

'SQL'

String

声明输入和输出的时间戳格式。当前支持的格式为'SQL'和'ISO-8601'。

  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。

debezium-json.map-null-key.mode

'FAIL'

String

指定处理 Map 中 key 值为空的方法。 当前支持的值有FAIL、DROP和LITERAL。

  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'debezium-json.map-null-key.literal' 定义。

debezium-json.map-null-key.literal

'null'

String

当 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

debezium-json.encode.decimal-as-plain-number

false

Boolean

将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。

元数据

元数据可以在 DDL 中作为只读(虚拟)meta 列声明。

Format的元数据只有在相应的连接器使用元数据时才可用。目前,只有Kafka连接器。

表3 元数据

Key

数据类型

说明

schema

STRING NULL

描述payload Schema的JSON字符串。如果Schema不再Debezium记录中,则为NULL。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

connector处理事件的时间戳。队医Debezium记录中的ts_ms字段。

source.timestamp

TIMESTAMP_LTZ(3) NULL

源系统创建事件时的时间戳。对应于Debezium记录中的source.ts_ms字段。

source.database

STRING NULL

源数据库。对应于Debezium记录中的source.db字段(如果可用)。

source.schema

STRING NULL

源数据库Schema。对应于Debezium记录中的source.schema字段(如果可用)。

source.table

STRING NULL

源数据库表。对应于Debezium中的source.table或source.collection字段(如果可用)。

source.properties

MAP<STRING, STRING> NULL

各种源属性的Map。对应于Debezium记录中的source字段。

元数据的使用用例参考如下:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafkaTopic',
  'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

示例

使用kafka解析Debezium Json数据,并将结果输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,选择flink版本为1.15,并提交运行,其代码如下:

    CREATE TABLE kafkaSource (
      id bigint,
      name string,
      description string,  
      weight DECIMAL(10, 2)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'debezium-json'
    );
    CREATE TABLE printSink (
      id bigint,
      name string,
      description string,  
      weight DECIMAL(10, 2)
    ) WITH (
      'connector' = 'print'
    );
    insert into printSink select * from kafkaSource;

  3. 向kafka的相应topic中插入下列数据,该数据表示MySQL 产品表有4列(id、name、description、weight)。该JSON 消息是 products 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。

    {
      "before": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter",
        "weight": 5.18
      },
      "after": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter",
        "weight": 5.15
      },
      "source": {
        "version": "0.9.5.Final",
    	"connector": "mysql",
    	"name": "fullfillment",
    	"server_id" :1,
    	"ts_sec": 1629607909,
    	"gtid": "mysql-bin.000001",
    	"pos": 2238,"row": 0,
    	"snapshot": false,
    	"thread": 7,
    	"db": "inventory",
    	"table": "test",
    	"query": null},
      "op": "u",
      "ts_ms": 1589362330904,
      "transaction": null
    }

  4. 按照如下方式查看taskmanager.out文件中的数据结果:

    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
    -U[111, scooter, Big 2-wheel scooter, 5.18]
    +U[111, scooter, Big 2-wheel scooter, 5.15]