Maxwell Format
功能描述
Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySql中的更改实时流式写入到Kafka等流式connector。Maxwell为changelog提供了统一的格式,而且支持使用JSON对消息进行序列化。
Flink 支持将 Maxwell JSON 消息解释为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在许多情况下,这对于利用此功能很有用。
例如:
- 将数据库中的增量数据同步到其他系统
- 审计日志
- 数据库的实时物化视图
- 临时连接更改数据库表的历史等等。
Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Maxwell JSON 消息,并发送到 Kafka 等外部系统。但是,目前 Flink 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UDPATE_AFTER 编码为 DELETE 和 INSERT Maxwell 消息。
更多具体使用可参考开源社区文档:Maxwell Format。
支持的Connector
- Kafka
- Filesystem
注意事项
Maxwell应用允许将每个变动消息精确地传递一次。在这种情况下,Flink在消费Maxwell生成的消息时处理得很好。如果Maxwell应用程序在at-least-once模式处理,它可能向Kafka写入重复的改动消息,Flink将获得重复的消息。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置table.exec.source.cdc-events-duplicate设置为true,并在源表上定义PRIMARY KEY。Framework将生成一个额外的有状态操作符,并使用主键对变更事件进行去重,并生成一个规范化的changelog流。
参数说明
参数 |
是否必选 |
默认值 |
类型 |
说明 |
---|---|---|---|---|
format |
是 |
(none) |
String |
指定使用格式,此处使用'maxwell-json'。 |
maxwell-json.ignore-parse-errors |
否 |
false |
Boolean |
跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。 |
maxwell-json.timestamp-format.standard |
否 |
'SQL' |
String |
指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”:
|
maxwell-json.map-null-key.mode |
否 |
'FAIL' |
String |
指定序列化map数据的null键时的处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”:
|
maxwell-json.map-null-key.literal |
否 |
'null' |
String |
当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串以替换null键。 |
maxwell-json.encode.decimal-as-plain-number |
否 |
false |
Boolean |
将所有小数编码为普通数字,而不是可能的科学计数法。默认情况下,小数可以使用科学计数法书写。例如,0.000000027在默认情况下被编码为2.7E-8,如果将此选项设置为true,则将被写入为0.000000027。 |
元数据
元数据可以在 DDL 中作为只读(虚拟)meta 列声明。
Key |
数据类型 |
说明 |
---|---|---|
database |
STRING NULL |
源数据库。对应于Maxwell记录中的数据库字段(如果可用)。 |
table |
STRING NULL |
源数据库中的表。对应于Maxwell记录中的表字段(如果可用)。 |
primary-key-columns |
ARRAY<STRING> NULL |
主键名数组。对应于Maxwell记录中的primary_key_columns字段(如果可用)。 |
ingestion-timestamp |
TIMESTAMP_LTZ(3) NULL |
连接器处理事件的时间戳。对应Maxwell记录中的ts字段。 |
元数据使用示例如下:
CREATE TABLE KafkaTable ( origin_database STRING METADATA FROM 'value.database' VIRTUAL, origin_table STRING METADATA FROM 'value.table' VIRTUAL, origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL, origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'maxwell-json' );
示例
使用kafka发送数据,输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
- 创建flink opensource sql作业,选择flink1.15,并提交运行,其代码如下:
CREATE TABLE kafkaSource ( id bigint, name string, description string, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'maxwell-json' ); CREATE TABLE printSink ( id bigint, name string, description string, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
- 向kafka的相应topic中插入下列数据(每个字段的含义请参考Maxwell documentation):
{ "database":"test", "table":"e", "type":"insert", "ts":1477053217, "xid":23396, "commit":true, "position":"master.000006:800911", "server_id":23042, "thread_id":108, "primary_key": [1, "2016-10-21 05:33:37.523000"], "primary_key_columns": ["id", "c"], "data":{ "id":111, "name":"scooter", "description":"Big 2-wheel scooter", "weight":5.15 }, "old":{ "weight":5.18 } }
- 按照如下方式查看taskmanager.out文件中的数据结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
+I[111, scooter, Big 2-wheel scooter, 5.15]