更新时间:2024-11-08 GMT+08:00

Maxwell Format

功能描述

Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySql中的更改实时流式写入到Kafka等流式connector。Maxwell为changelog提供了统一的格式,而且支持使用JSON对消息进行序列化。

Flink 支持将 Maxwell JSON 消息解释为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在许多情况下,这对于利用此功能很有用。

例如:

  • 将数据库中的增量数据同步到其他系统
  • 审计日志
  • 数据库的实时物化视图
  • 临时连接更改数据库表的历史等等。

Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Maxwell JSON 消息,并发送到 Kafka 等外部系统。但是,目前 Flink 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UDPATE_AFTER 编码为 DELETE 和 INSERT Maxwell 消息。

更多具体使用可参考开源社区文档:Maxwell Format

支持的Connector

  • Kafka
  • Filesystem

注意事项

Maxwell应用允许将每个变动消息精确地传递一次。在这种情况下,Flink在消费Maxwell生成的消息时处理得很好。如果Maxwell应用程序在at-least-once模式处理,它可能向Kafka写入重复的改动消息,Flink将获得重复的消息。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置table.exec.source.cdc-events-duplicate设置为true,并在源表上定义PRIMARY KEY。Framework将生成一个额外的有状态操作符,并使用主键对变更事件进行去重,并生成一个规范化的changelog流。

参数说明

表1 参数说明

参数

是否必选

默认值

类型

说明

format

(none)

String

指定使用格式,此处使用'maxwell-json'。

maxwell-json.ignore-parse-errors

false

Boolean

跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。

maxwell-json.timestamp-format.standard

'SQL'

String

指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”:

  • 'SQL'将以“yyyy-MM-dd HH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30 12:13:14.123' 并以相同格式输出时间戳。
  • 'ISO-8601'将以“yyyy-MM-ddTHH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30T12:13:14.123' 并以相同格式输出时间戳。

maxwell-json.map-null-key.mode

'FAIL'

String

指定序列化map数据的null键时的处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”:

  • 'FAIL'将在遇到带有null键的map时抛出异常。
  • 'DROP'将删除map数据的null键条目。
  • 'LITERAL'将使用字符串代替null键。字符串由 maxwell-json.map-null-key.literal 选项定义。

maxwell-json.map-null-key.literal

'null'

String

当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串以替换null键。

maxwell-json.encode.decimal-as-plain-number

false

Boolean

将所有小数编码为普通数字,而不是可能的科学计数法。默认情况下,小数可以使用科学计数法书写。例如,0.000000027在默认情况下被编码为2.7E-8,如果将此选项设置为true,则将被写入为0.000000027。

元数据

元数据可以在 DDL 中作为只读(虚拟)meta 列声明。

表2 元数据

Key

数据类型

说明

database

STRING NULL

源数据库。对应于Maxwell记录中的数据库字段(如果可用)。

table

STRING NULL

源数据库中的表。对应于Maxwell记录中的表字段(如果可用)。

primary-key-columns

ARRAY<STRING> NULL

主键名数组。对应于Maxwell记录中的primary_key_columns字段(如果可用)。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

连接器处理事件的时间戳。对应Maxwell记录中的ts字段。

元数据使用示例如下:

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafkaTopic',
  'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'maxwell-json'
);

示例

使用kafka发送数据,输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,选择flink1.15,并提交运行,其代码如下:

    CREATE TABLE kafkaSource (
      id bigint,
      name string,
      description string,  
      weight DECIMAL(10, 2)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'maxwell-json'
    );
    
    
    CREATE TABLE printSink (
      id bigint,
      name string,
      description string,  
      weight DECIMAL(10, 2)
    ) WITH (
      'connector' = 'print'
    );
    insert into printSink select * from kafkaSource;

  3. 向kafka的相应topic中插入下列数据(每个字段的含义请参考Maxwell documentation):

    {
       "database":"test",
       "table":"e",
       "type":"insert",
       "ts":1477053217,
       "xid":23396,
       "commit":true,
       "position":"master.000006:800911",
       "server_id":23042,
       "thread_id":108,
       "primary_key": [1, "2016-10-21 05:33:37.523000"],
       "primary_key_columns": ["id", "c"],
       "data":{
         "id":111,
         "name":"scooter",
         "description":"Big 2-wheel scooter",
         "weight":5.15
       },
       "old":{
         "weight":5.18
       }
    }

  4. 按照如下方式查看taskmanager.out文件中的数据结果:

    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
    +I[111, scooter, Big 2-wheel scooter, 5.15]