Kafka源表
功能描述
创建source流从Kafka获取数据,作为作业的输入数据。
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
前提条件
- 确保已创建Kafka集群。
- 该场景作业需要运行在DLI的独享队列上,因此要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
跨源认证简介及操作方法请参考跨源认证简介。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 建表时数据类型的使用请参考Format章节。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '' ); |
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
参数说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
指定要使用的连接器,固定为'kafka'。 |
topic |
是 |
无 |
String |
Kafka的topic名称。 注意:
|
topic-pattern |
否 |
无 |
String |
匹配读取kafka topic名称的正则表达式。 注意:“topic-pattern”和“topic”只能选择一个,不可同时存在。 例如: 'topic.*' '(topic-c|topic-d)' '(topic-a|topic-b|topic-\\d*)' '(topic-a|topic-b|topic-[0-9]*)' |
properties.bootstrap.servers |
是 |
无 |
String |
Kafka brokers地址,以逗号分隔。 |
properties.group.id |
是 |
无 |
String |
消费组名称 |
properties.* |
否 |
无 |
String |
设置和传入任意的Kafka原生配置文件。 注意:
|
format |
是 |
无 |
String |
序列化和反序列化Kafka消息的value的格式。注意:该参数和'value.format'参数只能选择一个。 请参考Format页面以获取更多详细信息和格式参数。 |
key.format |
否 |
无 |
String |
序列化和反序列化Kafka消息的key的格式。 注意:
|
key.fields |
否 |
[] |
List<String> |
定义表中的列作为key的列表,同时需要配置'key.format'。 该参数默认为空,因此没有定义key。 使用形式如:'field1;field2'。 |
key.fields-prefix |
否 |
无 |
String |
为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 |
value.format |
是 |
无 |
String |
用于反序列化和序列化 Kafka 消息的值部分的格式。 注意:
|
value.fields-include |
否 |
ALL |
枚举类型 可选值:[ALL, EXCEPT_KEY] |
在解析消息体时,是否要包含消息键字段。 取值如下:
|
scan.startup.mode |
否 |
group-offsets |
String |
Kafka读取数据的启动位点。 取值如下:
|
scan.startup.specific-offsets |
否 |
无 |
String |
在scan.startup.mode参数指定为'specific-offsets'模式下生效,为每个分区指定偏移量,例如:partition:0,offset:42;partition:1,offset:300'。 |
scan.startup.timestamp-millis |
否 |
无 |
Long |
在scan.startup.mode参数指定为'timestamp'模式下生效,指定启动位点时间戳。 |
scan.topic-partition-discovery.interval |
否 |
无 |
Duration |
消费者定期发现动态创建的Kafka主题和分区的时间间隔。 |
ssl_auth_name |
否 |
无 |
String |
DLI侧创建的Kafka_SSL类型的跨源认证名称。Kafka配置SSL时使用该配置。 注意:若仅使用SSL类型,则需要同时配置'properties.security.protocol '= 'SSL'; 若使用SASL_SSL类型,则需要同时配置
|
krb_auth_name |
否 |
无 |
String |
DLI侧创建的Kerberos类型的跨源认证名称。Kafka配置SASL认证时使用该配置。 注意:如果使用SASL_PLAINTEXT类型,且使用Kerberos认证,则需要同时配置'properties.sasl.mechanism' = 'GSSAPI'和'properties.security.protocol' = 'SASL_PLAINTEXT' |
元信息列
您可以在源表中定义元信息列,以获取Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据。
Key |
数据类型 |
是否可读(R)写(W) |
说明 |
---|---|---|---|
topic |
STRING NOT NULL |
R |
Kafka消息所在的Topic名称。 |
partition |
INT NOT NULL |
R |
Kafka消息所在的Partition名称。 |
headers |
MAP<STRING, BYTES> NOT NULL |
R/W |
Kafka消息的headers。 |
leader-epoch |
INT NULL |
R |
Kafka消息的Leader epoch。 |
offset |
BIGINT NOT NULL |
R |
Kafka消息的偏移量(offset)。 |
timestamp |
TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL |
R/W |
Kafka消息的时间戳。 |
timestamp-type |
STRING NOT NULL |
R |
Kafka消息的时间戳类型:
|
示例(适用于Kafka集群未开启SASL_SSL场景)
- 示例1:读取Kafka的元信息列,输出到Print sink中。
- 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE orders ( `topic` String metadata, `partition` int metadata, `headers` MAP<STRING, BYTES> metadata, `leaderEpoch` INT metadata from 'leader-epoch', `offset` bigint metadata, `timestamp` TIMESTAMP(3) metadata, `timestampType` string metadata from 'timestamp-type', `message` string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "csv", "csv.field-delimiter" = "\u0001", "csv.quote-character" = "''" ); CREATE TABLE printSink ( `topic` String, `partition` int, `headers` MAP<STRING, BYTES>, `leaderEpoch` INT, `offset` bigint, `timestamp` TIMESTAMP(3), `timestampType` string, `message` string --message表示读取kafka中存储的用户写入数据 ) WITH ( 'connector' = 'print' ); insert into printSink select * from orders;
若不需要读取整个message的消息,而是需要读取每个字段的值,则需要将使用如下语句:
CREATE TABLE orders ( `topic` String metadata, `partition` int metadata, `headers` MAP<STRING, BYTES> metadata, `leaderEpoch` INT metadata from 'leader-epoch', `offset` bigint metadata, `timestamp` TIMESTAMP(3) metadata, `timestampType` string metadata from 'timestamp-type', order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE printSink ( `topic` String, `partition` int, `headers` MAP<STRING, BYTES>, `leaderEpoch` INT, `offset` bigint, `timestamp` TIMESTAMP(3), `timestampType` string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from orders;
- 向Kafka的相应topic中发送如下数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
- 用户可按下述操作查看输出结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。
数据结果参考如下:
+I(fz-source-json,0,{},0,243,2021-12-27T09:23:32.253,CreateTime,{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}) +I(fz-source-json,0,{},0,244,2021-12-27T09:23:39.655,CreateTime,{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}) +I(fz-source-json,0,{},0,245,2021-12-27T09:23:48.405,CreateTime,{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"})
- 示例2:将Kafka作为源表,Print作为结果表,从Kafka中读取编码格式为json数据类型的数据,输出到日志文件中。
- 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 创建flink opensource sql作业,输入以下作业运行脚本,并提交运行。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE orders ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE printSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from orders;
- 向Kafka的相应topic中发送输入测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
- 用户可按下述操作查看输出结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。
数据结果参考如下:
+I(202103241000000001,webShop,2021-03-24T10:00,100.0,100.0,2021-03-2410:02:03,0001,Alice,330106) +I(202103241606060001,appShop,2021-03-24T16:06:06,200.0,180.0,2021-03-2416:10:06,0001,Alice,330106) +I(202103251202020001,miniAppShop,2021-03-25T12:02:02,60.0,60.0,2021-03-2512:03:00,0002,Bob,330110)
示例(适用于Kafka集群已开启SASL_SSL场景)
- 示例1:DMS集群使用SASL_SSL认证方式。
创建DMS的kafka集群,开启SASL_SSL,并下载SSL证书,将下载的证书client.jks上传到OBS桶中。
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/xx.jks', -- 用户上传证书的位置 'properties.sasl.mechanism' = 'PLAIN', -- 按照SASL_PLAINTEXT方式填写 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";', -- 创建kafka集群时设置的账号和密码 "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/xx.jks', 'properties.sasl.mechanism' = 'PLAIN', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";', "format" = "json" ); insert into ordersSink select * from ordersSource;
- 示例2:MRS集群使用kafka SASL_SSL认证方式。
- MRS集群请开启Kerberos认证。
- 在”组件管理 > Kafka > 服务配置”中查找配置项” security.protocol”,并设置为”SASL_SSL”。
- 登录MRS集群的Manager,下载用户凭据:”系统设置 > 用户管理 ,单击用户名后的”更多 > 下载认证凭据”。
- 若运行作业提示“Message stream modified (41)”,可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。
- 端口请使用KafKa服务配置中设置的sasl_ssl.port端口。
- security.protocol请设置为SASL_SSL。
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21009,xx:21009', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', -- 用户名 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_SSL', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- 生成truststore.jks设置的密码 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21009,xx:21009', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); insert into ordersSink select * from ordersSource;
- 示例3:MRS集群使用SASL_PAINTEXT的Kerberos认证。
- MRS集群请开启Kerberos认证。
- 将“组件管理 > Kafka > 服务配置”中查找配置项” security.protocol”,并设置为”SASL_PLAINTEXT”。
- 登录MRS集群的Manager,下载用户凭据“系统设置 > 用户管理”,单击用户名后的“更多 > 下载认证凭据”,并上传到OBS中。
- 若运行提示“Message stream modified (41)”的错误,可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。
- 端口请使用KafKa服务配置中设置的sasl.port端口。
- security.protocol请设置为SASL_PLAINTEXT。
CREATE TABLE ordersSources ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21007,xx:21007', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21007,xx:21007', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); insert into ordersSink select * from ordersSource;
- 示例4:MRS集群使用SSL方式。
- MRS集群请不要开启Kerberos认证。
- 登录MRS集群的Manager,下载用户凭据:“系统设置 > 用户管理”。 单击用户名后的“更多 > 下载认证凭据”。
- 端口请注意使用KafKa服务配置中设置的ssl.port端口
- security.protocol请设置为SSL。
- ssl.mode.enable请设置为true。
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- 生成truststore.jks时设置的密码 'properties.security.protocol' = 'SSL', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;
常见问题
- Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。参考增强型跨源连接重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。
- Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword.
sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。