更新时间:2024-04-19 GMT+08:00

Ogg Format

功能描述

Oracle GoldenGate (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。

Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等

Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。

支持的Connector

  • Kafka
  • FileSystem

参数说明

表1 参数说明

参数

是否必须

默认值

类型

描述

format

(none)

String

指定要使用的格式,此处应为 'ogg-json'。

ogg-json.ignore-parse-errors

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。

debezium-json.timestamp-format.standard

'SQL'

String

声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601':

  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。

ogg-json.map-null-key.mode

'FAIL'

String

指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL', 'DROP' 和 'LITERAL':

  • Option 'FAIL' 将抛出异常。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ogg-json.map-null-key.literal 定义。

ogg-json.map-null-key.literal

'null'

String

当 'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

元数据

表2 元数据

Key

数据类型

描述

table

STRING NULL

包含完全限定的表名。完全限定表名的格式为:CATALOG NAME.SCHEMA NAME.TABLE NAME

primary-keys

ARRAY<STRING> NULL

保存源表的主键的列名的数组。

如果includePrimaryKeys配置属性设置为true,则仅在JSON输出中包含primary-keys字段。

ingestion-timestamp

TIMESTAMP_LTZ(6) NULL

connector处理事件的时间戳。对应Ogg记录中的current_ts字段。

event-timestamp

TIMESTAMP_LTZ(6) NULL

源系统创建事件的时间戳。对应Ogg记录的op_ts字段。

元数据的使用用例参考如下:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafkaTopic',
  'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'ogg-json'
);

示例

使用ogg-json读取kafka中的ogg记录,并输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,选择flink1.15版本,并提交运行,其代码如下:

    CREATE TABLE kafkaSource (
      id bigint,
      name string,
      description string,  
      weight DECIMAL(10, 2)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'ogg-json'
    );
    
    CREATE TABLE printSink (
      id bigint,
      name string,
      description string,  
      weight DECIMAL(10, 2)
    ) WITH (
      'connector' = 'print'
    );
    insert into printSink select * from kafkaSource;    
    

  3. 向kafka的相应topic中插入下列数据,该数据表示Oracle PRODUCTS 表 有 4 列 (id, name, description and weight). 上面的 JSON 消息是 PRODUCTS 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。

    {
      "before": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter",
        "weight": 5.18
      },
      "after": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter",
        "weight": 5.15
      },
      "op_type": "U",
      "op_ts": "2020-05-13 15:40:06.000000",
      "current_ts": "2020-05-13 15:40:07.000000",
      "primary_keys": [
        "id"
      ],
      "pos": "00000000000000000000143",
      "table": "PRODUCTS"
    }

  4. 按照如下方式查看taskmanager.out文件中的数据结果:

    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
    -U[111, scooter, Big 2-wheel scooter, 5.18]
    +U[111, scooter, Big 2-wheel scooter, 5.15]