Maxwell Format
功能描述
Flink 支持将 Maxwell JSON 消息解释为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在许多情况下,这对于利用此功能很有用。
例如:
- 将数据库中的增量数据同步到其他系统
- 审计日志
- 数据库的实时物化视图
- 临时连接更改数据库表的历史等等。
Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Maxwell JSON 消息,并发送到 Kafka 等外部系统。但是,目前 Flink 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UDPATE_AFTER 编码为 DELETE 和 INSERT Maxwell 消息。
参数说明
参数 |
是否必选 |
默认值 |
类型 |
说明 |
---|---|---|---|---|
format |
是 |
(none) |
String |
指定使用格式,此处使用'maxwell-json'。 |
maxwell-json.ignore-parse-errors |
否 |
false |
Boolean |
跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。 |
maxwell-json.timestamp-format.standard |
否 |
'SQL' |
String |
指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”:选项“SQL”将以“yyyy-MM-dd HH:mm:ss.s{precision}”格式解析输入时间戳,例如“2020-12-30 12” :13:14.123' 并以相同格式输出时间戳。选项'ISO-8601'将以“yyyy-MM-ddTHH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30T12: 13:14.123' 并以相同格式输出时间戳。 |
maxwell-json.map-null-key.mode |
否 |
'FAIL' |
String |
在序列化地图数据的空键时指定处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”:选项“FAIL”将在遇到带有空键的地图时抛出异常。选项“DROP”将删除地图数据的空键条目。选项“LITERAL”将替换空带字符串文字的键。字符串文字由 maxwell-json.map-null-key.literal 选项定义。 |
maxwell-json.map-null-key.literal |
否 |
'null' |
String |
当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串文字以替换空键。 |
支持的Connector
- Kafka
示例
使用kafka发送数据,输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
- 创建flink opensource sql作业,选择flink1.12,并提交运行,其代码如下:
create table kafkaSource( id bigint, name string, description string, weight DECIMAL(10, 2) ) with ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.group.id' = '<yourGroupId>', 'properties.bootstrap.servers' = '<yourKafkaAddress1>:<yourKafkaPort>,<yourKafkaAddress2>:<yourKafkaPort>', 'scan.startup.mode' = 'latest-offset', 'format' = 'maxwell-json' ); create table printSink( id bigint, name string, description string, weight DECIMAL(10, 2) ) with ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
- 向kafka的相应topic中插入下列数据:
{ "database":"test", "table":"e", "type":"insert", "ts":1477053217, "xid":23396, "commit":true, "position":"master.000006:800911", "server_id":23042, "thread_id":108, "primary_key": [1, "2016-10-21 05:33:37.523000"], "primary_key_columns": ["id", "c"], "data":{ "id":111, "name":"scooter", "description":"Big 2-wheel scooter", "weight":5.15 }, "old":{ "weight":5.18 } }
- 用户可按下述操作查看输出结果:
- 方法一:"更多" -> "FlinkUI" -> "Task Managers" -> "Stdout"。
- 方法二:若在提交运行作业前选择了保存日志,则可以从日志的taskmanager.out文件中查看。
+I(111,scooter,Big 2-wheel scooter,5.15)