Upsert Kafka结果表
功能描述
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。DLI将Flink作业的输出数据以upsert的模式输出到Kafka中。
Upsert Kafka 连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。
upsert-kafka连接器作为 sink,可以消费changelog 流。它会将INSERT/UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入(表示对应 key 的消息被删除)。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
前提条件
- 确保已创建Kafka集群。
- 该场景作业需要运行在DLI的独享队列上,因此要与Kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
跨源认证简介及操作方法请参考跨源认证简介。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 数据类型的使用,请参考Format章节。
- Upsert Kafka始终以upsert方式工作,并且需要在 DDL 中定义主键。
- 默认情况下,如果启用checkpoint,Upsert Kafka sink会保证至少一次将数据插入Kafka topic。这意味着,Flink可以将具有相同key的重复记录写入Kafka topic。因此,upsert-kafka 连接器可以实现幂等写入。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 |
create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'upsert-kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'key.format' = '', 'value.format' = '' ); |
参数说明
参数 |
是否必选 |
默认参数 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
(none) |
String |
connector类型,对于upsert kafka,需配置为'upsert-kafka'。 |
topic |
是 |
(none) |
String |
Kafka topic名。 |
properties.bootstrap.servers |
是 |
(none) |
String |
Kafka brokers地址,以逗号分隔。 |
key.format |
是 |
(none) |
String |
用于对Kafka消息中key部分序列化和反序列化的格式。key字段由PRIMARY KEY语法指定。支持的格式如下:
请参考Format页面以获取更多详细信息和格式参数。 |
key.fields-prefix |
否 |
(none) |
String |
为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。 默认情况下,前缀为空。如果定义了自定义前缀,则表架构 和'key.fields'都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求'value.fields-include' 必须设置为'EXCEPT_KEY'。 |
value.format |
是 |
(none) |
String |
用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式:
请参考Format页面以获取更多详细信息和格式参数。 |
value.fields-include |
否 |
‘ALL’ |
String |
控制哪些字段应该出现在value中。可取值:
|
sink.parallelism |
否 |
(none) |
Interger |
定义upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。 |
properties.* |
否 |
(none) |
String |
该选项可以传递任意的 Kafka 参数。 选项的后缀名必须匹配定义在 kafka参数文档中的参数名。 Flink会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。 例如:你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。 但是'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为Flink会重写这些参数的值。 |
ssl_auth_name |
否 |
无 |
String |
DLI侧创建的Kafka_SSL类型的跨源认证名称。Kafka配置SSL时使用该配置。 注意:若仅使用SSL类型,则需要同时配置'properties.security.protocol '= 'SSL'; 若使用SASL_SSL类型,则需要同时配置'properties.security.protocol' = 'SASL_SSL'、'properties.sasl.mechanism' = 'GSSAPI或者PLAIN'、'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";' |
krb_auth_name |
否 |
无 |
String |
DLI侧创建的Kerberos类型的跨源认证名称。Kafka配置SASL认证时使用该配置。 注意:如果使用SASL_PLAINTEXT类型,且使用Kerberos认证,则需要同时配置'properties.sasl.mechanism' = 'GSSAPI'和'properties.security.protocol' = 'SASL_PLAINTEXT' |
示例
从Kafka源表获取Kafka source topic数据,通过Upsert Kafka结果表将Kafka source topic数据写入到Kafka sink topic中。
- 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE UPSERTKAFKASINK ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'key.format' = 'json', 'value.format' = 'json' ); insert into UPSERTKAFKASINK select * from orders;
- 连接Kafka集群,kafka中source topic发送如下测试数据:
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
- 连接Kafka集群,获取kafka sink topic的数据,结果参考如下:
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
常见问题
无