Debezium Format
功能描述
Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持将 Debezium JSON 和 Avro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如:
- 将增量数据从数据库同步到其他系统
 - 日志审计
 - 数据库的实时物化视图
 - 关联维度数据库的变更历史
 
Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 或 Avro 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。
更多具体使用可参考开源社区文档:Debezium Format。
支持的Connector
- Kafka
 - Filesystem
 
注意事项
- 重复的变更事件
    
在正常的操作环境下,Debezium 应用能以exactly-once的语义投递每条变更事件。在这种情况下,Flink 消费 Debezium 产生的变更事件能够工作得很好。 当发生故障时,Debezium应用只能保证at-least-once的投递语义。即在非正常情况下,Debezium可能会投递重复的变更事件到Kafka中,当Flink从Kafka中消费的时候就会得到重复的事件。 这可能会导致Flink query的运行得到错误的结果或者非预期的异常。
解决方案:将作业参数 table.exec.source.cdc-events-duplicate 设置成true,并在该source上定义PRIMARY KEY。
框架会生成一个额外的有状态算子,使用该primary key来对变更事件去重并生成一个规范化的changelog流。
更新信息请参考Debezium 官方文档。
 - 消费Debezium Postgres Connector产生的数据
    
如果您正在使用Debezium PostgreSQL Connector捕获变更到 Kafka,请确保被监控表的REPLICA IDENTITY 已经被配置成FULL ,默认值是DEFAULT。 否则,Flink SQL将无法正确解析Debezium数据。
当配置为 FULL 时,更新和删除事件将完整包含所有列的之前的值。
当为其他配置时,更新和删除事件的“before”字段将只包含primary key字段的值,或者为 null(没有 primary key)。
您可以通过运行 ALTER TABLE <your-table-name> REPLICA IDENTITY FULL 来更改 REPLICA IDENTITY 的配置。
 
参数说明
Flink 提供了 debezium-avro-confluent 和 debezium-json 两种 format 来解析 Debezium 生成的 JSON 格式和 Avro 格式的消息。 请使用 debezium-avro-confluent 来解析 Debezium 的 Avro 消息,使用 debezium-json 来解析 Debezium 的 JSON 消息。
| 
        参数  | 
      
        是否必选  | 
      
        默认值  | 
      
        类型  | 
      
        说明  | 
     
|---|---|---|---|---|
| 
        format  | 
      
        是  | 
      
        (none)  | 
      
        String  | 
      
        指定使用格式,此处使用'debezium-avro-confluent'。  | 
     
| 
        debezium-avro-confluent.basic-auth.credentials-source  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        Schema Registry的基本身份验证凭据源。  | 
     
| 
        debezium-avro-confluent.basic-auth.user-info  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        Schema Registry的基本身份验证用户信息。  | 
     
| 
        debezium-avro-confluent.bearer-auth.credentials-source  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        Schema Registry的承载身份验证凭据源。  | 
     
| 
        debezium-avro-confluent.bearer-auth.token  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        Schema Registry的承载身份验证Token。  | 
     
| 
        debezium-avro-confluent.properties  | 
      
        否  | 
      
        (none)  | 
      
        Map  | 
      
        转发到底层Schema Registry的属性Map。这对于没有通过Flink显示配置的配置项非常有用。但是,请注意,Flink配置项具有更高的优先级。  | 
     
| 
        debezium-avro-confluent.ssl.keystore.location  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        SSL keystore的位置/文件。  | 
     
| 
        debezium-avro-confluent.ssl.keystore.password  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        SSL keystore的密码。  | 
     
| 
        debezium-avro-confluent.ssl.truststore.location  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        SSL truststore的位置/文件。  | 
     
| 
        debezium-avro-confluent.ssl.truststore.password  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        SSL truststore的密码。  | 
     
| 
        debezium-avro-confluent.subject  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        用于在序列化期间此格式使用的注册schema的Confluent Schema Registry主题。默认情况下,'kafka'和'upsert-kafka'连接器使用'<topic_name>-value'或'<topic_name>-key'作为默认主题名称,如果此格式用作键或值的格式。但是对于其他连接器(例如'filesystem'),在用作sink时需要使用主题选项。  | 
     
| 
        debezium-avro-confluent.url  | 
      
        否  | 
      
        (none)  | 
      
        String  | 
      
        用于获取/注册架构的Confluent Schema Registry的URL。  | 
     
| 
        参数  | 
      
        是否必选  | 
      
        默认值  | 
      
        是否必选  | 
      
        描述  | 
     
|---|---|---|---|---|
| 
        format  | 
      
        是  | 
      
        (none)  | 
      
        String  | 
      
        指定要使用的格式,此处应为 'debezium-json'。  | 
     
| 
        debezium-json.schema-include  | 
      
        否  | 
      
        false  | 
      
        Boolean  | 
      
        设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。  | 
     
| 
        debezium-json.ignore-parse-errors  | 
      
        否  | 
      
        false  | 
      
        Boolean  | 
      
        当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。  | 
     
| 
        debezium-json.timestamp-format.standard  | 
      
        否  | 
      
        'SQL'  | 
      
        String  | 
      
        声明输入和输出的时间戳格式。当前支持的格式为'SQL'和'ISO-8601'。 
  | 
     
| 
        debezium-json.map-null-key.mode  | 
      
        否  | 
      
        'FAIL'  | 
      
        String  | 
      
        指定处理 Map 中 key 值为空的方法。 当前支持的值有FAIL、DROP和LITERAL。 
  | 
     
| 
        debezium-json.map-null-key.literal  | 
      
        否  | 
      
        'null'  | 
      
        String  | 
      
        当 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。  | 
     
| 
        debezium-json.encode.decimal-as-plain-number  | 
      
        否  | 
      
        false  | 
      
        Boolean  | 
      
        将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。  | 
     
元数据
元数据可以在 DDL 中作为只读(虚拟)meta 列声明。
Format的元数据只有在相应的连接器使用元数据时才可用。目前,只有Kafka连接器。
| 
        Key  | 
      
        数据类型  | 
      
        说明  | 
     
|---|---|---|
| 
        schema  | 
      
        STRING NULL  | 
      
        描述payload Schema的JSON字符串。如果Schema不在Debezium记录中,则为NULL。  | 
     
| 
        ingestion-timestamp  | 
      
        TIMESTAMP_LTZ(3) NULL  | 
      
        connector处理事件的时间戳。对应Debezium记录中的ts_ms字段。  | 
     
| 
        source.timestamp  | 
      
        TIMESTAMP_LTZ(3) NULL  | 
      
        源系统创建事件时的时间戳。对应于Debezium记录中的source.ts_ms字段。  | 
     
| 
        source.database  | 
      
        STRING NULL  | 
      
        源数据库。对应于Debezium记录中的source.db字段(如果可用)。  | 
     
| 
        source.schema  | 
      
        STRING NULL  | 
      
        源数据库Schema。对应于Debezium记录中的source.schema字段(如果可用)。  | 
     
| 
        source.table  | 
      
        STRING NULL  | 
      
        源数据库表。对应于Debezium中的source.table或source.collection字段(如果可用)。  | 
     
| 
        source.properties  | 
      
        MAP<STRING, STRING> NULL  | 
      
        各种源属性的Map。对应于Debezium记录中的source字段。  | 
     
元数据的使用用例参考如下:
CREATE TABLE KafkaTable ( origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, origin_database STRING METADATA FROM 'value.source.database' VIRTUAL, origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL, origin_table STRING METADATA FROM 'value.source.table' VIRTUAL, origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL, user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' );
示例
使用kafka解析Debezium Json数据,并将结果输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
 - 创建flink opensource sql作业,选择flink版本为1.15,并提交运行,其代码如下:
    
    
CREATE TABLE kafkaSource ( id bigint, name string, description string, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'debezium-json' ); CREATE TABLE printSink ( id bigint, name string, description string, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
 - 向kafka的相应topic中插入下列数据,该数据表示MySQL 产品表有4列(id、name、description、weight)。该JSON 消息是 products 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。
    
    
{ "before": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.18 }, "after": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.15 }, "source": { "version": "0.9.5.Final", "connector": "mysql", "name": "fullfillment", "server_id" :1, "ts_sec": 1629607909, "gtid": "mysql-bin.000001", "pos": 2238,"row": 0, "snapshot": false, "thread": 7, "db": "inventory", "table": "test", "query": null}, "op": "u", "ts_ms": 1589362330904, "transaction": null } - 按照如下方式查看taskmanager.out文件中的数据结果:
    
    
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
 - 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
 - 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
 
-U[111, scooter, Big 2-wheel scooter, 5.18] +U[111, scooter, Big 2-wheel scooter, 5.15]