更新时间:2024-11-08 GMT+08:00

Maxwell Format

功能描述

Flink 支持将 Maxwell JSON 消息解释为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在许多情况下,这对于利用此功能很有用。

例如:

  • 将数据库中的增量数据同步到其他系统
  • 审计日志
  • 数据库的实时物化视图
  • 临时连接更改数据库表的历史等等。

Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Maxwell JSON 消息,并发送到 Kafka 等外部系统。但是,目前 Flink 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UDPATE_AFTER 编码为 DELETE 和 INSERT Maxwell 消息。

参数说明

参数

是否必选

默认值

类型

说明

format

(none)

String

指定使用格式,此处使用'maxwell-json'。

maxwell-json.ignore-parse-errors

false

Boolean

跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。

maxwell-json.timestamp-format.standard

'SQL'

String

指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”:选项“SQL”将以“yyyy-MM-dd HH:mm:ss.s{precision}”格式解析输入时间戳,例如“2020-12-30 12” :13:14.123' 并以相同格式输出时间戳。选项'ISO-8601'将以“yyyy-MM-ddTHH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30T12: 13:14.123' 并以相同格式输出时间戳。

maxwell-json.map-null-key.mode

'FAIL'

String

在序列化地图数据的空键时指定处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”:选项“FAIL”将在遇到带有空键的地图时抛出异常。选项“DROP”将删除地图数据的空键条目。选项“LITERAL”将替换空带字符串文字的键。字符串文字由 maxwell-json.map-null-key.literal 选项定义。

maxwell-json.map-null-key.literal

'null'

String

当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串文字以替换空键。

支持的Connector

  • Kafka

示例

使用kafka发送数据,输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,选择flink1.12,并提交运行,其代码如下:

    create table kafkaSource(
      id bigint,
      name string,
      description string,
      weight DECIMAL(10, 2)  
      ) with (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.group.id' = '<yourGroupId>',
        'properties.bootstrap.servers' = '<yourKafkaAddress1>:<yourKafkaPort>,<yourKafkaAddress2>:<yourKafkaPort>',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'maxwell-json'
    );
    create table printSink(
      id bigint,
      name string,
      description string,
      weight DECIMAL(10, 2)
       ) with (
         'connector' = 'print'
       );
    insert into printSink select * from kafkaSource;

  3. 向kafka的相应topic中插入下列数据:

    {
       "database":"test",
       "table":"e",
       "type":"insert",
       "ts":1477053217,
       "xid":23396,
       "commit":true,
       "position":"master.000006:800911",
       "server_id":23042,
       "thread_id":108,
       "primary_key": [1, "2016-10-21 05:33:37.523000"],
       "primary_key_columns": ["id", "c"],
       "data":{
         "id":111,
         "name":"scooter",
         "description":"Big 2-wheel scooter",
         "weight":5.15
       },
       "old":{
         "weight":5.18
       }
    }

  4. 用户可按下述操作查看输出结果:

    • 方法一:"更多" -> "FlinkUI" -> "Task Managers" -> "Stdout"。
    • 方法二:若在提交运行作业前选择了保存日志,则可以从日志的taskmanager.out文件中查看。
    +I(111,scooter,Big 2-wheel scooter,5.15)