Debezium Format
功能描述
Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON消息。
Flink 支持将 Debezium JSON解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
- 将增量数据从数据库同步到其他系统
- 日志审计
- 数据库的实时物化视图
- 关联维度数据库的变更历史,等等。
参数说明
参数 |
是否必选 |
默认值 |
是否必选 |
描述 |
---|---|---|---|---|
format |
是 |
(none) |
String |
指定要使用的格式,此处应为 'debezium-json'。 |
debezium-json.schema-include |
否 |
false |
Boolean |
设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。 |
debezium-json.ignore-parse-errors |
否 |
false |
Boolean |
当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 |
debezium-json.timestamp-format.standard |
否 |
'SQL' |
String |
声明输入和输出的时间戳格式。当前支持的格式为'SQL'和'ISO-8601'。
|
debezium-json.map-null-key.mode |
否 |
'FAIL' |
String |
指定处理 Map 中 key 值为空的方法。 当前支持的值有FAIL、DROP和LITERAL。
|
debezium-json.map-null-key.literal |
否 |
'null' |
String |
当 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 |
支持的Connector
- Kafka
示例
使用kafka发送数据,输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
- 创建flink opensource sql作业,并提交运行,其代码如下:
create table kafkaSource( id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2) ) with ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.group.id' = '<yourGroupId>', 'properties.bootstrap.servers' = '<yourKafkaAddress>:<yourKafkaPort>', 'scan.startup.mode' = 'latest-offset', 'format' = 'debezium-json' ); create table printSink( id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2) ) with ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
- 向kafka的相应topic中插入下列数据:
{ "before": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.18 }, "after": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.15 }, "source": { "version": "0.9.5.Final", "connector": "mysql", "name": "fullfillment", "server_id" :1, "ts_sec": 1629607909, "gtid": "mysql-bin.000001", "pos": 2238,"row": 0, "snapshot": false, "thread": 7, "db": "inventory", "table": "test", "query": null}, "op": "u", "ts_ms": 1589362330904, "transaction": null }
- 用户可按下述操作查看输出结果:
- 方法一:"更多" -> "FlinkUI" -> "Task Managers" -> "Stdout"。
- 方法二:若在提交运行作业前选择了保存日志,则可以从日志的taskmanager.out文件中查看。
-U(111,scooter,Big2-wheel scooter,5.18) +U(111,scooter,Big2-wheel scooter,5.15)