Upsert Kafka
功能描述
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。Upsert Kafka 连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。表类型支持源表和结果表。
- 作为source,upsert-kafka 连接器生产changelog流,其中每条数据记录代表一个更新或删除事件。
数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key(如果不存在相应的key,则该更新被视为INSERT)。用表来类比,changelog 流中的数据记录被解释为UPSERT,也称为INSERT/UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。
- 作为sink,upsert-kafka连接器可以消费changelog流。它会将INSERT/UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入(表示对应 key 的消息被删除)。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
类别 |
详情 |
---|---|
支持表类型 |
源表、结果表 |
前提条件
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- Upsert Kafka 始终以upsert方式工作,并且需要在DDL中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changlog source 定义主键意味着 在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在Kafka消息的key中。
- 由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。
- 数据类型的使用,请参考Format章节。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 |
create table kafkaTable( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'upsert-kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'key.format' = '', 'value.format' = '' ); |
参数说明
参数 |
是否必选 |
默认参数 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
connector类型,对于upsert kafka连接器,需配置为'upsert-kafka'。 |
topic |
是 |
无 |
String |
Kafka topic名。 |
properties.bootstrap.servers |
是 |
无 |
String |
Kafka brokers地址,以逗号分隔。 |
key.format |
是 |
无 |
String |
用于对Kafka消息中key部分序列化和反序列化的格式。key字段由PRIMARY KEY语法指定。支持的格式如下:
请参考Format页面以获取更多详细信息和格式参数。 |
key.fields-prefix |
否 |
无 |
String |
为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。 默认情况下,前缀为空。如果定义了自定义前缀,则表架构和'key.fields'都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求'value.fields-include' 必须设置为'EXCEPT_KEY'。 |
value.format |
是 |
无 |
String |
用于对 Kafka消息中 value 部分序列化和反序列化的格式。支持的格式:
请参考Format页面以获取更多详细信息和格式参数。 |
value.fields-include |
是 |
ALL |
String |
控制哪些字段应该出现在值中。取值范围如下:
|
properties.* |
否 |
无 |
String |
该选项可以传递任意的Kafka参数。 “properties.”后的后缀名必须匹配定义在 kafka参数文档中的参数名。 Flink会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入KafkaClient。 例如:您可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。 但是'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为Flink会重写这些参数的值。 |
sink.parallelism |
否 |
无 |
Integer |
定义upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游连接算子的并行度保持一致。 |
sink.buffer-flush.max-rows |
否 |
0 |
Integer |
缓存刷新前,最多能缓存的记录条数。 当sink收到很多同key上的更新时,缓存将保留同 key 的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。可以通过设置为'0'来禁用它。 默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。 |
sink.buffer-flush.interval |
否 |
0 |
Duration |
缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如'sink.buffer-flush.interval'='10 ms'。 默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。 |
元数据
可用的元数据字段列表,请参阅Kafka连接器。
示例
- 示例1:该示例是从DMS Kafka数据源中读取数据,并写入到Print结果表中。
- 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE upsertKafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'key.format' = 'csv', 'value.format' = 'json' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO printSink SELECT * FROM upsertKafkaSource;
- 向Kafka中的指定topic中插入如下数据(注意:kafka插入数据时请指定key)。
{"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
- 查看taskmanager的out文件,数据结果参考如下:
+I(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110) +I(202303251505050001,appshop,2023-03-25 15:05:05,500.0,400.0,2023-03-2515:10:00,0003,Cindy,330108) -U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110) +U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330111)
- 示例2:从Kafka源表获取DMS Kafka source topic数据,通过Upsert Kafka结果表将Kafka source topic数据写入到Kafka sink topic中。
- 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE upsertKafkaSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'key.format' = 'csv', 'value.format' = 'json' ); insert into upsertKafkaSink select * from orders;
- 连接Kafka集群,kafka中source topic发送如下测试数据:
{"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
- 连接Kafka集群,获取kafka sink topic的数据,结果参考如下:
{"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
- 示例3:MRS集群开启Kerberos认证,并且Kafka使用SASL_PLAINTEXT协议,从Kafka源表获取数据,并写入到Print结果表中。
- 参考增强型跨源连接,根据MRS集群所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置MRS集群的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE upsertKafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'key.format' = 'csv', 'value.format' = 'json', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', --mrs中配置 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'username', --用户名 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', --krb5_conf路径 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab' --keytab路径 ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO printSink SELECT * FROM upsertKafkaSource;
- 向Kafka中的指定topic中插入如下数据(注意:kafka插入数据时请指定key):
{"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
- 查看taskmanager的out文件,数据结果参考如下:
+I(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110) +I(202303251505050001,appshop,2023-03-2515:05:05,500.0,400.0,2023-03-2515:10:00,0003,Cindy,330108) -U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110) +U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330111)
常见问题
无