Debezium Format
功能描述
Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持将 Debezium JSON 和 Avro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如:
- 将增量数据从数据库同步到其他系统
- 日志审计
- 数据库的实时物化视图
- 关联维度数据库的变更历史
Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 或 Avro 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。
更多具体使用可参考开源社区文档:Debezium Format。
支持的Connector
- Kafka
- Filesystem
注意事项
- 重复的变更事件
在正常的操作环境下,Debezium 应用能以exactly-once的语义投递每条变更事件。在这种情况下,Flink 消费 Debezium 产生的变更事件能够工作得很好。 单当发生故障时,Debezium应用只能保证at-least-once的投递语义。即在非正常情况下,Debezium可能会投递重复的变更事件到Kafka中,当Flink从Kafka中消费的时候就会得到重复的事件。 这可能会导致Flink query的运行得到错误的结果或者非预期的异常。
解决方案:将作业参数 table.exec.source.cdc-events-duplicate 设置成true,并在该source上定义PRIMARY KEY。
框架会生成一个额外的有状态算子,使用该primary key来对变更事件去重并生成一个规范化的changelog流。
更新信息请参考Debezium 官方文档。
- 消费Debezium Postgres Connector产生的数据
如果你正在使用Debezium PostgreSQL Connector捕获变更到 Kafka,请确保被监控表的REPLICA IDENTITY 已经被配置成FULL ,默认值是DEFAULT。 否则,Flink SQL将无法正确解析Debezium数据。
当配置为 FULL 时,更新和删除事件将完整包含所有列的之前的值。
当为其他配置时,更新和删除事件的“before”字段将只包含primary key字段的值,或者为 null(没有 primary key)。
您可以通过运行 ALTER TABLE <your-table-name> REPLICA IDENTITY FULL 来更改 REPLICA IDENTITY 的配置。
参数说明
Flink 提供了 debezium-avro-confluent 和 debezium-json 两种 format 来解析 Debezium 生成的 JSON 格式和 Avro 格式的消息。 请使用 debezium-avro-confluent 来解析 Debezium 的 Avro 消息,使用 debezium-json 来解析 Debezium 的 JSON 消息。
参数 |
是否必选 |
默认值 |
类型 |
说明 |
---|---|---|---|---|
format |
是 |
(none) |
String |
指定使用格式,此处使用'debezium-avro-confluent'。 |
debezium-avro-confluent.basic-auth.credentials-source |
否 |
(none) |
String |
Schema Registry的基本身份验证凭据源。 |
debezium-avro-confluent.basic-auth.user-info |
否 |
(none) |
String |
Schema Registry的基本身份验证用户信息。 |
debezium-avro-confluent.bearer-auth.credentials-source |
否 |
(none) |
String |
Schema Registry的承载身份验证凭据源。 |
debezium-avro-confluent.bearer-auth.token |
否 |
(none) |
String |
Schema Registry的承载身份验证Token。 |
debezium-avro-confluent.properties |
否 |
(none) |
Map |
转发到底层Schema Registry的属性Map。这对于没有通过Flink显示配置的配置项非常有用。但是,请注意,Flink配置项具有更高的优先级。 |
debezium-avro-confluent.ssl.keystore.location |
否 |
(none) |
String |
SSL keystore的位置/文件。 |
debezium-avro-confluent.ssl.keystore.password |
否 |
(none) |
String |
SSL keystore的密码。 |
debezium-avro-confluent.ssl.truststore.location |
否 |
(none) |
String |
SSL truststore的位置/文件。 |
debezium-avro-confluent.ssl.truststore.password |
否 |
(none) |
String |
SSL truststore的密码。 |
debezium-avro-confluent.subject |
否 |
(none) |
String |
用于在序列化期间此格式使用的注册schema的Confluent Schema Registry主题。默认情况下,'kafka'和'upsert-kafka'连接器使用'<topic_name>-value'或'<topic_name>-key'作为默认主题名称,如果此格式用作键或值的格式。但是对于其他连接器(例如'filesystem'),在用作sink时需要使用主题选项。 |
debezium-avro-confluent.url |
否 |
(none) |
String |
用于获取/注册架构的Confluent Schema Registry的URL。 |
参数 |
是否必选 |
默认值 |
是否必选 |
描述 |
---|---|---|---|---|
format |
是 |
(none) |
String |
指定要使用的格式,此处应为 'debezium-json'。 |
debezium-json.schema-include |
否 |
false |
Boolean |
设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。 |
debezium-json.ignore-parse-errors |
否 |
false |
Boolean |
当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 |
debezium-json.timestamp-format.standard |
否 |
'SQL' |
String |
声明输入和输出的时间戳格式。当前支持的格式为'SQL'和'ISO-8601'。
|
debezium-json.map-null-key.mode |
否 |
'FAIL' |
String |
指定处理 Map 中 key 值为空的方法。 当前支持的值有FAIL、DROP和LITERAL。
|
debezium-json.map-null-key.literal |
否 |
'null' |
String |
当 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 |
debezium-json.encode.decimal-as-plain-number |
否 |
false |
Boolean |
将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。 |
元数据
元数据可以在 DDL 中作为只读(虚拟)meta 列声明。
Format的元数据只有在相应的连接器使用元数据时才可用。目前,只有Kafka连接器。
Key |
数据类型 |
说明 |
---|---|---|
schema |
STRING NULL |
描述payload Schema的JSON字符串。如果Schema不再Debezium记录中,则为NULL。 |
ingestion-timestamp |
TIMESTAMP_LTZ(3) NULL |
connector处理事件的时间戳。队医Debezium记录中的ts_ms字段。 |
source.timestamp |
TIMESTAMP_LTZ(3) NULL |
源系统创建事件时的时间戳。对应于Debezium记录中的source.ts_ms字段。 |
source.database |
STRING NULL |
源数据库。对应于Debezium记录中的source.db字段(如果可用)。 |
source.schema |
STRING NULL |
源数据库Schema。对应于Debezium记录中的source.schema字段(如果可用)。 |
source.table |
STRING NULL |
源数据库表。对应于Debezium中的source.table或source.collection字段(如果可用)。 |
source.properties |
MAP<STRING, STRING> NULL |
各种源属性的Map。对应于Debezium记录中的source字段。 |
元数据的使用用例参考如下:
CREATE TABLE KafkaTable ( origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, origin_database STRING METADATA FROM 'value.source.database' VIRTUAL, origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL, origin_table STRING METADATA FROM 'value.source.table' VIRTUAL, origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL, user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' );
示例
使用kafka解析Debezium Json数据,并将结果输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
- 创建flink opensource sql作业,选择flink版本为1.15,并提交运行,其代码如下:
CREATE TABLE kafkaSource ( id bigint, name string, description string, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'debezium-json' ); CREATE TABLE printSink ( id bigint, name string, description string, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
- 向kafka的相应topic中插入下列数据,该数据表示MySQL 产品表有4列(id、name、description、weight)。该JSON 消息是 products 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。
{ "before": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.18 }, "after": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.15 }, "source": { "version": "0.9.5.Final", "connector": "mysql", "name": "fullfillment", "server_id" :1, "ts_sec": 1629607909, "gtid": "mysql-bin.000001", "pos": 2238,"row": 0, "snapshot": false, "thread": 7, "db": "inventory", "table": "test", "query": null}, "op": "u", "ts_ms": 1589362330904, "transaction": null }
- 按照如下方式查看taskmanager.out文件中的数据结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
-U[111, scooter, Big 2-wheel scooter, 5.18] +U[111, scooter, Big 2-wheel scooter, 5.15]