Canal Format
功能描述
Canal是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf序列化消息(Canal 默认使用 protobuf)。
Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
- 将增量数据从数据库同步到其他系统
- 日志审计
- 数据库的实时物化视图
- 关联维度数据库的变更历史,等等。
Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。
更多具体使用可参考开源社区文档:Canal Format。
支持的Connector
- Kafka
- Filesystem
参数说明
参数 |
是否必选 |
默认值 |
类型 |
说明 |
---|---|---|---|---|
format |
是 |
(none) |
String |
指定要使用的格式,此处应为 'canal-json'. |
canal-json.ignore-parse-errors |
否 |
false |
Boolean |
当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 |
canal-json.timestamp-format.standard |
否 |
'SQL' |
String |
指定输入和输出时间戳格式。当前支持的值是:'SQL'和'ISO-8601'。
|
canal-json.map-null-key.mode |
否 |
'FALL' |
String |
指定处理 Map 中 key 值为空的方法. 当前支持的值有'FAIL', 'DROP'和 'LITERAL'。
|
canal-json.map-null-key.literal |
否 |
'null' |
String |
当 'canal-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 |
canal-json.encode.decimal-as-plain-number |
否 |
false |
Boolean |
将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。 |
canal-json.database.include |
否 |
(none) |
String |
一个可选的正则表达式,通过正则匹配 Canal 记录中的 "database" 元字段,仅读取指定数据库的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。 |
canal-json.table.include |
否 |
(none) |
String |
一个可选的正则表达式,通过正则匹配 Canal 记录中的 "table" 元字段,仅读取指定表的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。 |
元数据
元数据可以在 DDL 中作为只读(虚拟)meta 列声明。
Format的元数据只有在相应的连接器使用元数据时才可用。目前,只有Kafka连接器。
Key |
数据类型 |
说明 |
---|---|---|
database |
STRING NULL |
源数据库。对应于Canal记录中的database字段(如果可用)。 |
table |
STRING NULL |
源数据库表。对应于Canal中的table字段(如果可用)。 |
sql-type |
MAP<STRING, INT> NULL |
各种sql类型的Map。对应于Canal记录中的sqlType字段(如果可用)。 |
pk-names |
ARRAY<STRING> NULL |
主键名数组。对应于Canal记录中的pkNames字段(如果可用)。 |
ingestion-timestamp |
TIMESTAMP_LTZ(3) NULL |
connector处理事件的时间戳。对应Canal记录中的ts字段。 |
元数据的使用用例参考如下:
CREATE TABLE KafkaTable ( origin_database STRING METADATA FROM 'value.database' VIRTUAL, origin_table STRING METADATA FROM 'value.table' VIRTUAL, origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL, origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL, origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'canal-json' );
示例
使用canal-json读取kafka中的canal记录,并输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
- 创建flink opensource sql作业,选择flink1.15版本,并提交运行,其代码如下:
create table kafkaSource( id bigint, name string, description string, weight DECIMAL(10, 2) ) with ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.group.id' = '<yourGroupId>', 'properties.bootstrap.servers' = '<yourKafkaAddress>:<yourKafkaPort>', 'scan.startup.mode' = 'latest-offset', 'format' = 'canal-json' ); create table printSink( id bigint, name string, description string, weight DECIMAL(10, 2) ) with ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
- 向kafka的相应topic中插入下列数据,该数据表示MySQL products 表有4列(id,name,description 和 weight)。JSON 消息是 products 表上的一个更新事件,表示 id = 111 的行数据上 weight 字段值从5.15变更成为 5.18。
{ "data": [ { "id": "111", "name": "scooter", "description": "Big 2-wheel scooter", "weight": "5.18" } ], "database": "inventory", "es": 1589373560000, "id": 9, "isDdl": false, "mysqlType": { "id": "INTEGER", "name": "VARCHAR(255)", "description": "VARCHAR(512)", "weight": "FLOAT" }, "old": [ { "weight": "5.15" } ], "pkNames": [ "id" ], "sql": "", "sqlType": { "id": 4, "name": 12, "description": 12, "weight": 7 }, "table": "products", "ts": 1589373560798, "type": "UPDATE" }
- 按照如下方式查看taskmanager.out文件中的数据结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
-U[111, scooter, Big 2-wheel scooter, 5.15] +U[111, scooter, Big 2-wheel scooter, 5.18]