更新时间:2024-04-19 GMT+08:00

Upsert Kafka

功能描述

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。Upsert Kafka 连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。表类型支持源表和结果表。

  • 作为source,upsert-kafka 连接器生产changelog流,其中每条数据记录代表一个更新或删除事件。

    数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key(如果不存在相应的key,则该更新被视为INSERT)。用表来类比,changelog 流中的数据记录被解释为UPSERT,也称为INSERT/UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。

  • 作为sink,upsert-kafka连接器可以消费changelog流。它会将INSERT/UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入(表示对应 key 的消息被删除)。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
表1 支持类别

类别

详情

支持表类型

源表、结果表

前提条件

该场景作业需要运行在DLI的独享队列上,要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • Upsert Kafka 始终以upsert方式工作,并且需要在DDL中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changlog source 定义主键意味着 在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在Kafka消息的key中。
  • 由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。
  • 数据类型的使用,请参考Format章节。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
create table kafkaTable(
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'upsert-kafka',
  'topic' = '',
  'properties.bootstrap.servers' = '',
  'key.format' = '',
  'value.format' = ''
);

参数说明

表2 参数说明

参数

是否必选

默认参数

数据类型

说明

connector

String

connector类型,对于upsert kafka连接器,需配置为'upsert-kafka'。

topic

String

Kafka topic名。

properties.bootstrap.servers

String

Kafka brokers地址,以逗号分隔。

key.format

String

用于对Kafka消息中key部分序列化和反序列化的格式。key字段由PRIMARY KEY语法指定。支持的格式如下:

  • csv
  • json
  • avro

请参考Format页面以获取更多详细信息和格式参数。

key.fields-prefix

String

为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。

默认情况下,前缀为空。如果定义了自定义前缀,则表架构和'key.fields'都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求'value.fields-include' 必须设置为'EXCEPT_KEY'。

value.format

String

用于对 Kafka消息中 value 部分序列化和反序列化的格式。支持的格式:

  • csv
  • json
  • avro

请参考Format页面以获取更多详细信息和格式参数。

value.fields-include

ALL

String

控制哪些字段应该出现在值中。取值范围如下:

  • ALL:消息的value部分将包含schema的所有字段,包括定义中键的字段。
  • EXCEPT_KEY:记录的value部分包含schema的所有内容,定义为主键的字段除外。

properties.*

String

该选项可以传递任意的Kafka参数。

“properties.”后的后缀名必须匹配定义在 kafka参数文档中的参数名。 Flink会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入KafkaClient。

例如:你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。

但是'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为Flink会重写这些参数的值。

sink.parallelism

Integer

定义upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。

sink.buffer-flush.max-rows

0

Integer

缓存刷新前,最多能缓存的记录条数。

当sink收到很多同key上的更新时,缓存将保留同 key 的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。可以通过设置为'0'来禁用它。

默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。

sink.buffer-flush.interval

0

Duration

缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如'sink.buffer-flush.interval'='10 ms'。

默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。

元数据

可用的元数据字段列表,请参阅Kafka连接器

示例

  • 示例1:该示例是从DMS Kafka数据源中读取数据,并写入到Print结果表中。
    1. 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
      CREATE TABLE upsertKafkaSource (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,  
        area_id string,
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'KafkaTopic',
        'properties.bootstrap.servers' =  'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort',
        'key.format' = 'csv',
        'value.format' = 'json'
      );
      
      CREATE TABLE printSink (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,  
        area_id string,
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO printSink SELECT * FROM upsertKafkaSource;
    4. 向Kafka中的指定topic中插入如下数据(注意:kafka插入数据时请指定key)。
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
      
      {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
      
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
    5. 查看taskmanager的out文件,数据结果参考如下:
      +I(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110)
      +I(202303251505050001,appshop,2023-03-25 15:05:05,500.0,400.0,2023-03-2515:10:00,0003,Cindy,330108)
      -U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110)
      +U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330111)
  • 示例2:从Kafka源表获取DMS Kafka source topic数据,通过Upsert Kafka结果表将Kafka source topic数据写入到Kafka sink topic中。
    1. 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。

      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改

      CREATE TABLE orders (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'KafkaTopic',
        'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort',
        'properties.group.id' = 'GroupId',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
      );
      
      CREATE TABLE upsertKafkaSink (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string,
        PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'KafkaTopic',
        'properties.bootstrap.servers' =  'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort',
        'key.format' = 'csv',
        'value.format' = 'json'
      );
      
      insert into upsertKafkaSink select * from orders;
    4. 连接Kafka集群,kafka中source topic发送如下测试数据:
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
      
      {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
      
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
    5. 连接Kafka集群,获取kafka sink topic的数据,结果参考如下:
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
      
      {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
      
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
  • 示例3:MRS集群开启Kerberos认证,并且Kafka使用SASL_PLAINTEXT协议,从Kafka源表获取数据,并写入到Print结果表中。
    1. 参考增强型跨源连接,根据MRS集群所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置MRS集群的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。

      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。

      CREATE TABLE upsertKafkaSource (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string,
        PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'KafkaTopic',
        'properties.bootstrap.servers' =  'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort',
        'key.format' = 'csv',
        'value.format' = 'json',
        'properties.sasl.mechanism' = 'GSSAPI',
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.sasl.kerberos.service.name' = 'kafka', --mrs中配置
        'properties.connector.auth.open' = 'true',
        'properties.connector.kerberos.principal' = 'username', --用户名
        'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', --krb5_conf路径
        'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab' --keytab路径
      );
      
      CREATE TABLE printSink (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,  
        area_id string,
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO printSink SELECT * FROM upsertKafkaSource;
    4. 向Kafka中的指定topic中插入如下数据(注意:kafka插入数据时请指定key):
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
      
      {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
      
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
    5. 查看taskmanager的out文件,数据结果参考如下:
      +I(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110)
      +I(202303251505050001,appshop,2023-03-2515:05:05,500.0,400.0,2023-03-2515:10:00,0003,Cindy,330108)
      -U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110)
      +U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330111)

常见问题