更新时间:2023-02-24 GMT+08:00

Debezium Format

功能描述

Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON消息。

Flink 支持将 Debezium JSON解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等。

参数说明

表1

参数

是否必选

默认值

是否必选

描述

format

(none)

String

指定要使用的格式,此处应为 'debezium-json'。

debezium-json.schema-include

false

Boolean

设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。

debezium-json.ignore-parse-errors

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。

debezium-json.timestamp-format.standard

'SQL'

String

声明输入和输出的时间戳格式。当前支持的格式为'SQL'和'ISO-8601'。

  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。

debezium-json.map-null-key.mode

'FAIL'

String

指定处理 Map 中 key 值为空的方法。 当前支持的值有FAIL、DROP和LITERAL。

  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'debezium-json.map-null-key.literal' 定义。

debezium-json.map-null-key.literal

'null'

String

当 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

支持的Connector

  • Kafka

示例

使用kafka发送数据,输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,并提交运行,其代码如下:

    create table kafkaSource(
      id BIGINT,
      name STRING,
      description STRING,
      weight DECIMAL(10, 2)
      ) with (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.group.id' = '<yourGroupId>',
        'properties.bootstrap.servers' = '<yourKafkaAddress>:<yourKafkaPort>',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'debezium-json'
    );
    create table printSink(
      id BIGINT,
      name STRING,
      description STRING,
      weight DECIMAL(10, 2)
       ) with (
         'connector' = 'print'
       );
    insert into printSink select * from kafkaSource;

  3. 向kafka的相应topic中插入下列数据:

    {
      "before": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter",
        "weight": 5.18
      },
      "after": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter",
        "weight": 5.15
      },
      "source": {
        "version": "0.9.5.Final",
    	"connector": "mysql",
    	"name": "fullfillment",
    	"server_id" :1,
    	"ts_sec": 1629607909,
    	"gtid": "mysql-bin.000001",
    	"pos": 2238,"row": 0,
    	"snapshot": false,
    	"thread": 7,
    	"db": "inventory",
    	"table": "test",
    	"query": null},
      "op": "u",
      "ts_ms": 1589362330904,
      "transaction": null
    }

  4. 用户可按下述操作查看输出结果:

    • 方法一:"更多" -> "FlinkUI" -> "Task Managers" -> "Stdout"。
    • 方法二:若在提交运行作业前选择了保存日志,则可以从日志的taskmanager.out文件中查看。
    -U(111,scooter,Big2-wheel scooter,5.18)
    +U(111,scooter,Big2-wheel scooter,5.15)