Kafka结果表
功能描述
DLI通过Kafka结果表将Flink作业的输出数据输出到Kafka中。
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
前提条件
- 确保已创建kafka集群。
- 该场景作业需要运行在DLI的独享队列上,因此要与Kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
跨源认证简介及操作方法请参考跨源认证简介。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 数据类型的使用,请参考Format章节。
语法格式
1 2 3 4 5 6 7 8 9 10 11 |
create table kafkaSink( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'format' = '' ); |
参数说明
参数 |
是否必选 |
默认参数 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
string |
固定值为:kafka。 |
topic |
是 |
无 |
string |
结果表对应topic名称。 |
properties.bootstrap.servers |
是 |
无 |
string |
Kafka Broker地址。格式为:host:port,host:port,host:port,以英文逗号(,)分割。 |
format |
是 |
无 |
string |
Flink Kafka Connector在序列化来自Kafka的消息时使用的格式。该选项与'value.format'只能配置其中一个。 格式取值如下:
请参考Format页面以获取更多详细信息和格式参数。 |
topic-pattern |
否 |
无 |
String |
匹配读取kafka topic名称的正则表达式。 注意:“topic-pattern”和“topic”只能选择一个,不可同时存在。 例如:'topic.*' '(topic-c|topic-d)' '(topic-a|topic-b|topic-\\d*)' '(topic-a|topic-b|topic-[0-9]*)' |
properties.* |
否 |
无 |
String |
设置和传入任意的Kafka原生配置文件。 注意:
|
key.format |
否 |
无 |
String |
序列化和反序列化Kafka消息key的格式。 注意:
|
key.fields |
否 |
[] |
List<String> |
定义表中的列作为key的列表,同时需要配置'key.format'。 该参数默认为空,因此没有定义key。 使用形式如:'field1;field2'。 |
key.fields-prefix |
否 |
无 |
String |
为所有kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 |
value.format |
是 |
无 |
String |
用于反序列化和序列化Kafka消息的值部分的格式。 注意:
|
value.fields-include |
否 |
ALL |
枚举类型 可选值:[ALL, EXCEPT_KEY] |
在解析消息体时,是否要包含消息键字段。 取值如下:
|
sink.partitioner |
否 |
无 |
string |
从Flink分区到Kafka分区的映射模式。映射模式的取值如下:
|
sink.semantic |
否 |
at-least-once |
String |
定义kafka sink的语义。 可选值为:
|
sink.parallelism |
否 |
无 |
Integer |
定义Kafka sink算子的并行度。 默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。 |
ssl_auth_name |
否 |
无 |
String |
DLI侧创建的Kafka_SSL类型的跨源认证名称。Kafka配置SSL时使用该配置。 注意:若仅使用SSL类型,则需要同时配置'properties.security.protocol '= 'SSL'; 若使用SASL_SSL类型,则需要同时配置'properties.security.protocol' = 'SASL_SSL'、'properties.sasl.mechanism' = 'GSSAPI或者PLAIN'、'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";' |
krb_auth_name |
否 |
无 |
String |
DLI侧创建的Kerberos类型的跨源认证名称。Kafka配置SASL认证时使用该配置。 注意:如果使用SASL_PLAINTEXT类型,且使用Kerberos认证,则需要同时配置'properties.sasl.mechanism' = 'GSSAPI'和'properties.security.protocol' = 'SASL_PLAINTEXT' |
示例(适用于Kafka集群未开启SASL_SSL场景)
该示例是从Kafka的一个topic中读取数据,并使用Kafka结果表将数据写入到kafka的另一个topic中。
- 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE kafkaSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', "format" = "json" ); insert into kafkaSink select * from kafkaSource;
- 连接Kafka集群,向Kafka的source topic中插入如下测试数据:
{"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}
- 连接Kafka集群,在Kafka的sink topic读取数据,参考如下:
{"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}
示例(适用于Kafka集群已开启SASL_SSL场景)
- 示例1:DMS集群使用SASL_SSL认证方式。
创建DMS的kafka集群,开启SASL_SSL,并下载SSL证书,将下载的证书client.jks上传到OBS桶中。
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/xx.jks', -- 用户上传证书的位置 'properties.sasl.mechanism' = 'PLAIN', -- 按照SASL_PLAINTEXT方式填写 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";', -- 创建kafka集群时设置的账号和密码 "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/xx.jks', 'properties.sasl.mechanism' = 'PLAIN', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";', "format" = "json" ); insert into ordersSink select * from ordersSource;
- 示例2:MRS集群使用kafka SASL_SSL认证方式。
- MRS集群请开启Kerberos认证。
- 在”组件管理 > Kafka > 服务配置”中查找配置项” security.protocol”,并设置为”SASL_SSL”。
- 登录MRS集群的Manager,下载用户凭据:”系统设置 > 用户管理 ,单击用户名后的”更多 > 下载认证凭据”。
根据用户凭据生成相应的truststore.jks文件,并将用户凭据以及truststore.jks文件传入OBS中。
具体方式请参考客户端SSL加密功能使用说明。
- 若运行作业提示“Message stream modified (41)”,可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。
- 端口请使用KafKa服务配置中设置的sasl_ssl.port端口。
- security.protocol请设置为SASL_SSL。
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21009,xx:21009', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', -- 用户名 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_SSL', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- 生成truststore.jks设置的密码 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21009,xx:21009', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); insert into ordersSink select * from ordersSource;
- 示例3:MRS集群使用SASL_PAINTEXT的Kerberos认证。
- MRS集群请开启Kerberos认证。
- 将“组件管理 > Kafka > 服务配置”中查找配置项” security.protocol”,并设置为”SASL_PLAINTEXT”。
- 登录MRS集群的Manager,下载用户凭据“系统设置 > 用户管理”,单击用户名后的“更多 > 下载认证凭据”,并上传到OBS中。
- 若运行提示“Message stream modified (41)”的错误,可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。
- 端口请使用KafKa服务配置中设置的sasl.port端口。
- security.protocol请设置为SASL_PLAINTEXT。
CREATE TABLE ordersSources ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21007,xx:21007', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21007,xx:21007', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); insert into ordersSink select * from ordersSource;
- 示例4:MRS集群使用SSL方式。
- MRS集群请不要开启Kerberos认证。
- 登录MRS集群的Manager,下载用户凭据:“系统设置 > 用户管理”。 单击用户名后的“更多 > 下载认证凭据”。
根据用户凭据生成相应的truststore.jks文件,并将用户凭据以及truststore.jks文件传入OBS中。
具体方式请参考客户端SSL加密功能使用说明。
- 端口请注意使用KafKa服务配置中设置的ssl.port端口
- security.protocol请设置为SSL。
- ssl.mode.enable请设置为true。
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- 生成truststore.jks时设置的密码 'properties.security.protocol' = 'SSL', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;