更新时间:2024-04-23 GMT+08:00

Kafka

功能描述

Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

表1 支持类别

类别

详情

支持表类型

源表、结果表

支持数据格式

CSV

JSON

Apache Avro

Confluent Avro

Debezium CDC

Canal CDC

Maxwell CDC

OGG CDC

Raw

前提条件

  • 确保已创建Kafka集群。
  • 该场景作业需要运行在DLI的独享队列上,因此要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

注意事项

  • 更多具体使用可参考开源社区文档:Apache Kafka SQL 连接器
  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • with参数中字段只能使用单引号,不能使用双引号。
  • 建表时数据类型的使用请参考Format章节。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
create table kafkaSource(
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
  (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression)
)
with (
  'connector' = 'kafka',
  'topic' = '',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = '',
  'scan.startup.mode' = '',
  'format' = ''
);

源表参数说明

表2 源表参数说明

参数

是否必选

默认值

数据类型

参数说明

connector

String

指定使用的连接器,Kafka 连接器使用 'kafka'。

topic

String

当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。

对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。

当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。

topic-pattern

String

匹配读取 topic 名称的正则表达式。

在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。

注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。

了解更多请参考Topic和Partition的探测

properties.bootstrap.servers

String

逗号分隔的 Kafka broker 列表。

properties.group.id

对 source 可选,不适用于 sink

String

Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 "KafkaSource-{tableIdentifier}" 作为消费组 ID。

properties.*

String

设置和传递任意 Kafka 的配置项。

  • “properties.”中的后缀名必须匹配在Apache Kafka中定义的配置键。

    Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用 topic 的自动创建。

  • 某些配置项不支持进行配置,因为 Flink 会覆盖这些配置如'key.deserializer'和'value.deserializer'。

format

String

序列化和反序列化Kafka消息的value的格式。

该配置项和 'value.format' 二者必需其一。

key.format

String

用来序列化和反序列化 Kafka 消息键(Key)的格式。

  • 如果定义了键格式,则配置项 'key.fields' 也是必需的。 否则 Kafka 记录将使用空值作为键。
  • 请参考Format页面以获取更多详细信息和格式参数。

key.fields

[]

List<String>

表结构中用来配置消息键(Key)格式数据类型的字段列表。

默认情况下该列表为空,因此消息键没有定义。 列表格式为 'field1;field2'。

key.fields-prefix

String

为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。

如果定义了前缀,表结构和配置项 'key.fields' 都需要使用带前缀的名称。

当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。

该配置项要求必须将 'value.fields-include' 配置为 'EXCEPT_KEY'。

value.format

String

序列化和反序列化 Kafka 消息体时使用的格式。

  • value.format和format参数只能配置其中一个,如果同时配置两个,则会有冲突。
  • 请参考Format页面以获取更多详细信息和格式参数。

value.fields-include

ALL

枚举类型

可选值:[ALL, EXCEPT_KEY]

定义消息体(Value)格式如何处理消息键(Key)字段的策略。

默认情况下,表结构中 'ALL' 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。

scan.startup.mode

group-offsets

String

Kafka consumer 的启动模式。

取值如下:

  • earliest-offset:从可能的最早偏移量开始。
  • latest-offset:从最末尾偏移量开始。
  • group-offsets(默认值):从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
  • timestamp:从用户为每个 partition 指定的时间戳开始,时间戳通过scan.startup.timestamp-millis指定。
  • specific-offsets:从用户为每个 partition 指定的偏移量开始,位点通过scan.startup.specific-offsets指定。

scan.startup.specific-offsets

String

在使用 'specific-offsets' 启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'。

scan.startup.timestamp-millis

Long

在使用 'timestamp' 启动模式时指定启动的时间戳(单位毫秒)。

scan.topic-partition-discovery.interval

Duration

Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。

结果表参数说明

表3 结果表参数说明

参数

是否必选

默认值

数据类型

参数说明

connector

String

指定使用的连接器,Kafka 连接器使用 'kafka'。

topic

String

当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。

注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。

当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。

properties.bootstrap.servers

String

逗号分隔的 Kafka broker 列表。

properties.*

String

设置和传递任意 Kafka 的配置项。

  • “properties.”中的后缀名必须匹配在Apache Kafka中定义的配置键。

    Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用 topic 的自动创建。

  • 某些配置项不支持进行配置,因为 Flink 会覆盖这些配置如'key.deserializer'和'value.deserializer'。

format

String

序列化和反序列化Kafka消息的value的格式。注意:该配置项和 'value.format' 二者必需其一。

key.format

String

用来序列化和反序列化 Kafka 消息键(Key)的格式。

  • 如果定义了键格式,则配置项 'key.fields' 也是必需的。 否则 Kafka 记录将使用空值作为键。
  • 请参考Format页面以获取更多详细信息和格式参数。

key.fields

[]

List<String>

表结构中用来配置消息键(Key)格式数据类型的字段列表。

默认情况下该列表为空,因此消息键没有定义。 列表格式为 'field1;field2'。

key.fields-prefix

String

为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。

如果定义了前缀,表结构和配置项 'key.fields' 都需要使用带前缀的名称。

当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 注意:该配置项要求必须将 'value.fields-include' 配置为 'EXCEPT_KEY'。

value.format

String

序列化和反序列化 Kafka 消息体时使用的格式。

  • value.format和format参数只能配置其中一个,如果同时配置两个,则会有冲突。
  • 请参考Format页面以获取更多详细信息和格式参数。

value.fields-include

ALL

枚举类型

可选值:[ALL, EXCEPT_KEY]

定义消息体(Value)格式如何处理消息键(Key)字段的策略。

默认情况下,表结构中 'ALL' 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。

sink.partitioner

'default'

String

Flink partition 到 Kafka partition 的分区映射关系,可选值有:

  • default:使用 Kafka 默认的分区器对消息进行分区。
  • fixed:每个 Flink partition 最终对应最多一个 Kafka partition。
  • round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。
  • 自定义 FlinkKafkaPartitioner 的子类:例如 'org.mycompany.MyPartitioner'。

sink.semantic

at-least-once

String

定义 Kafka sink 的语义。有效值为 'at-least-once','exactly-once' 和 'none'。

sink.parallelism

Integer

定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

元数据

您可以在源表中定义元数据,以获取Kafka消息的元数据。

例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元数据,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据。

表4 元数据

Key

数据类型

是否可读(R)写(W)

说明

topic

STRING NOT NULL

R

Kafka 记录的 Topic 名。

partition

INT NOT NULL

R

Kafka 记录的 partition ID。

headers

MAP<STRING, BYTES> NOT NULL

R/W

二进制 Map 类型的 Kafka 记录头(Header)。

leader-epoch

INT NULL

R

Kafka 记录的 Leader epoch(如果可用)。

offset

BIGINT NOT NULL

R

Kafka 记录在 partition 中的 offset。

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL

R/W

Kafka 记录的时间戳。

timestamp-type

STRING NOT NULL

R

Kafka 记录的时间戳类型:

  • NoTimestampType:消息中没有定义时间戳。
  • CreateTime:消息产生的时间。
  • LogAppendTime:消息被添加到Kafka Broker的时间。

消息键(Key)与消息体(Value)的格式

Kafka消息的消息键和消息体部分都可以使用某种格式来序列化或反序列化成二进制数据。

  • 消息体格式

    由于 Kafka 消息中消息键是可选的,以下语句将使用消息体格式读取和写入消息,但不使用消息键格式。 'format' 选项与 'value.format' 意义相同。 所有的格式配置使用格式识别符作为前缀。

    CREATE TABLE KafkaTable (
      `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
      `user_id` BIGINT,
      `item_id` BIGINT,
      `behavior` STRING
    ) WITH (
      'connector' = 'kafka',
      ...
    
      'format' = 'json',
      'json.ignore-parse-errors' = 'true'
    )

    消息体格式将配置为以下的数据类型:

    ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
  • 消息键和消息体格式

    以下示例展示了如何配置和使用消息键和消息体格式。 格式配置使用 'key' 或 'value' 加上格式识别符作为前缀。

    CREATE TABLE KafkaTable (
      `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
      `user_id` BIGINT,
      `item_id` BIGINT,
      `behavior` STRING
    ) WITH (
      'connector' = 'kafka',
      ...
    
      'key.format' = 'json',
      'key.json.ignore-parse-errors' = 'true',
      'key.fields' = 'user_id;item_id',
    
      'value.format' = 'json',
      'value.json.fail-on-missing-field' = 'false',
      'value.fields-include' = 'ALL'
    )

    消息键格式包含了在 'key.fields' 中列出的字段(使用 ';' 分隔)和字段顺序。 因此将配置为以下的数据类型:

    ROW<`user_id` BIGINT, `item_id` BIGINT>

    由于消息体格式配置为 'value.fields-include' = 'ALL',所以消息键字段也会出现在消息体格式的数据类型中:

    ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
  • 重名的格式字段

    如果消息键字段和消息体字段重名,连接器无法根据表结构信息将这些列区分开。 'key.fields-prefix' 配置项可以在表结构中为消息键字段指定一个唯一名称,并在配置消息键格式的时候保留原名。

    以下示例展示了在消息键和消息体中同时包含 version 字段的情况:

    CREATE TABLE KafkaTable (
      `k_version` INT,
      `k_user_id` BIGINT,
      `k_item_id` BIGINT,
      `version` INT,
      `behavior` STRING
    ) WITH (
      'connector' = 'kafka',
      ...
    
      'key.format' = 'json',
      'key.fields-prefix' = 'k_',
      'key.fields' = 'k_version;k_user_id;k_item_id',
    
      'value.format' = 'json',
      'value.fields-include' = 'EXCEPT_KEY'
    )

    消息体格式必须配置为 'EXCEPT_KEY' 模式。格式将被配置为以下的数据类型:

    消息键格式:
    ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>
    
    消息体格式:
    ROW<`version` INT, `behavior` STRING>

Topic和Partition的探测

topic 和 topic-pattern 配置项决定了 source 消费的 topic 或 topic 的匹配规则。topic 配置项可接受使用分号间隔的 topic 列表,例如 topic-1;topic-2。 topic-pattern 配置项使用正则表达式来探测匹配的 topic。例如 topic-pattern 设置为 test-topic-[0-9],则在作业启动时,所有匹配该正则表达式的 topic(以 test-topic- 开头,以一位数字结尾)都将被 consumer 订阅。

为允许 consumer 在作业启动之后探测到动态创建的 topic,请将 scan.topic-partition-discovery.interval 配置为一个非负值。这将使 consumer 能够探测匹配名称规则的 topic 中新的 partition。

topic列表和topic匹配规则只适用于 source。对于sink端,Flink目前只支持单一topic。

示例1:读取CSV格式DMS Kafka的元数据,输出到Kafka sink中(适用于Kafka集群未开启SASL_SSL场景)

  1. 参考,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
  2. 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE kafkaSource(
      `topic` String metadata virtual,
      `partition` int metadata virtual,
      `headers` MAP<STRING, BYTES> metadata virtual,
      `leader-epoch` INT metadata virtual,
      `offset` bigint metadata virtual,
      `timestamp-type` string metadata virtual,
      `event_time` TIMESTAMP(3) metadata FROM 'timestamp',
      `message` string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'SourceKafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'csv',
      'csv.field-delimiter' = '\u0001',  
      'csv.quote-character' = ''''
    );
    
    CREATE TABLE kafkaSink (
      `topic` String,
      `partition` int,
      `headers` MAP<STRING, BYTES>,
      `leader-epoch` INT,
      `offset` bigint,
      `timestampType` string,
      `event_time` TIMESTAMP(3),
      `message` string --message表示读取kafka中存储的用户写入数据
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'SinkKafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'format' = 'json'
    );
    insert into kafkaSink select * from kafkaSource;
  4. 向Kafka的源表的topic中发送如下数据,Kafka topic为kafkaSource。

    具体操作可参考:Kafka客户端接入示例

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
  5. 读取Kafka的结果表的topic,Kafka topic为kafkaSink。

    具体操作可参考:Kafka客户端接入示例

    {"topic":"kafkaSource","partition":1,"headers":{},"leader-epoch":0,"offset":4,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.369","message":"{\"order_id\":\"202103251202020001\", \"order_channel\":\"miniAppShop\", \"order_time\":\"2021-03-25 12:02:02\", \"pay_amount\":\"60.00\", \"real_pay\":\"60.00\", \"pay_time\":\"2021-03-25 12:03:00\", \"user_id\":\"0002\", \"user_name\":\"Bob\", \"area_id\":\"330110\"}"}
    
    {"topic":"kafkaSource","partition":0,"headers":{},"leader-epoch":0,"offset":6,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.367","message":"{\"order_id\":\"202103241000000001\",\"order_channel\":\"webShop\",\"order_time\":\"2021-03-24 10:00:00\",\"pay_amount\":100.0,\"real_pay\":100.0,\"pay_time\":\"2021-03-24 10:02:03\",\"user_id\":\"0001\",\"user_name\":\"Alice\",\"area_id\":\"330106\"}"}
    
    {"topic":"kafkaSource","partition":2,"headers":{},"leader-epoch":0,"offset":5,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.368","message":"{\"order_id\":\"202103241606060001\",\"order_channel\":\"appShop\",\"order_time\":\"2021-03-24 16:06:06\",\"pay_amount\":200.0,\"real_pay\":180.0,\"pay_time\":\"2021-03-24 16:10:06\",\"user_id\":\"0001\",\"user_name\":\"Alice\",\"area_id\":\"330106\"}"}

示例2:将json格式DMS Kafka作为源表,输出到Kafka sink中(适用于Kafka集群未开启SASL_SSL场景)

将Kafka作为源表,Kafka作为结果表,从Kafka中读取编码格式为json数据类型的数据,输出到日志文件中。
  1. 参考,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
  2. 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 创建flink opensource sql作业,输入以下作业运行脚本,并提交运行。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE kafkaSource(
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,  
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaSourceTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE kafkaSink (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,  
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaSinkTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'format' = 'json'
    );
    insert into kafkaSink select * from kafkaSource;
  4. 向Kafka的源表的topic中发送如下数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
  5. 读取Kafka的结果表的topic,其数据结果参考如下:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}

示例3:将DMS Kafka作为源表,Print作为结果表(适用于Kafka集群已开启SASL_SSL场景)

创建DMS的kafka集群,开启SASL_SSL,并下载SSL证书,将下载的证书client.jks上传到OBS桶中。

其中,properties.sasl.jaas.config字段包含账号密码,使用DEW进行加密。

CREATE TABLE ordersSource (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string
) WITH (
  'connector' = 'kafka',
  'topic' = 'KafkaTopic',
  'properties.bootstrap.servers' = 'KafkaAddress1:9093,KafkaAddress2:9093',
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'latest-offset',
  'properties.connector.auth.open' = 'true',
  'properties.ssl.truststore.location' = 'obs://xx/client.jks',  -- 用户上传证书的位置
  'properties.sasl.mechanism' = 'PLAIN', 
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.jaas.config' = 'xx',  -- dew凭据管理中的key,其值如:org.apache.kafka.common.security.plain.PlainLoginModule required username=xx password=xx;
  'format' = 'json',
  'dew.endpoint' = 'kms.xx.com', --使用的DEW服务所在的endpoint信息
  'dew.csms.secretName' = 'xx', --DEW服务通用凭据的凭据名称
  'dew.csms.decrypt.fields' = 'properties.sasl.jaas.config', --其中properties.sasl.jaas.config字段值,需要利用DEW凭证管理,进行解密替换
  'dew.csms.version' = 'v1'
);

CREATE TABLE ordersSink (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string
) WITH (
  'connector' = 'print'
);
 insert into ordersSink select * from ordersSource;

示例4:将Kafka(MRS集群)作为源表,Print作为结果表(适用于Kafka已开启SASL_SSL场景,MRS使用Kerberos认证。)

  • MRS集群请开启Kerberos认证。
  • 在”组件管理 > Kafka > 服务配置”中查找配置项“ssl.mode.enable”,并设置为“true”,并重启kafka。
  • 登录MRS集群的Manager,下载用户凭据:“系统设置 > 用户管理” ,单击用户名后的“更多 > 下载认证凭据”。

    根据用户凭据生成相应的truststore.jks文件,并将用户凭据以及truststore.jks文件传入OBS中。

  • 如果运行作业提示“Message stream modified (41)”,可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。
  • 端口请使用KafKa服务配置中设置的sasl_ssl.port端口,默认为21009。
  • security.protocol请设置为SASL_SSL。
  • with参数中properties.ssl.truststore.password字段使用dew进行加密。
CREATE TABLE ordersSource (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafkaTopic',
  'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'latest-offset',
  'properties.sasl.kerberos.service.name' = 'kafka', -- mrs集群中配置值
  'properties.connector.auth.open' = 'true',
  'properties.connector.kerberos.principal' = 'xx',  -- 用户名
  'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf',
  'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.ssl.truststore.location' = 'obs://xx/truststore.jks',
  'properties.ssl.truststore.password' = 'xx',  -- dew凭据中的key
  'properties.sasl.mechanism' = 'GSSAPI',
  'format' = 'json',
  'dew.endpoint'='kms.xx.myhuaweicloud.com', --使用的DEW服务所在的endpoint信息
  'dew.csms.secretName'='xx', --DEW服务通用凭据的凭据名称
  'dew.csms.decrypt.fields'='properties.ssl.truststore.password', --其中properties.ssl.truststore.password字段值,需要利用DEW凭证管理,进行解密替换
  'dew.csms.version'='v1'
);

CREATE TABLE ordersSink (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string
) WITH (
  'connector' = 'print'
);
 insert into ordersSink select * from ordersSource;

示例5:将Kafka(MRS集群)作为源表,Print作为结果表(适用于Kafka已开启SASL_SSL场景,MRS使用SASL_PAINTEXT的Kerberos认证。)

  • MRS集群请开启Kerberos认证。
  • 将“组件管理 > Kafka > 服务配置”中查找配置项“ssl.mode.enable”,并设置为“True”, 并重启kafka。
  • 登录MRS集群的Manager,下载用户凭据“系统设置 > 用户管理”,单击用户名后的“更多 > 下载认证凭据”,并上传到OBS中。
  • 如果运行提示“Message stream modified (41)”的错误,可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。
  • 端口请使用KafKa服务配置中设置的sasl.port端口,默认为21007。
  • security.protocol请设置为SASL_PLAINTEXT。
CREATE TABLE ordersSource (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string
) WITH (
  'connector' = 'kafka',
  'topic' = 'KafkaTopic',
  'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'latest-offset',
  'properties.sasl.kerberos.service.name' = 'kafka', -- mrs集群中配置
  'properties.connector.auth.open' = 'true',
  'properties.connector.kerberos.principal' = 'xx',
  'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf',
  'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'GSSAPI',
  'format' = 'json'
);

CREATE TABLE ordersSink (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string
) WITH (
  'connector' = 'print'
);
 insert into ordersSink select * from ordersSource;

示例6:将Kafka(MRS集群)作为源表,Print作为结果表(适用于Kafka已开启SSL场景,MRS未开启Kerberos认证。)

  • MRS集群请不要开启Kerberos认证。
  • 登录MRS集群的Manager,下载用户凭据:“系统设置 > 用户管理”。 单击用户名后的“更多 > 下载认证凭据”。

    根据用户凭据生成相应的truststore.jks文件,并将用户凭据以及truststore.jks文件传入OBS中。

  • 端口请注意使用KafKa服务配置中设置的ssl.port端口,默认值为9093。
  • with参数中security.protocol请设置为SSL。
  • MRS集群kafka服务配置中,设置ssl.mode.enable请设置为true,并重启kafka
  • with参数中properties.ssl.truststore.password字段使用dew进行加密。
    CREATE TABLE ordersSource (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'properties.connector.auth.open' = 'true',
      'properties.ssl.truststore.location' = 'obs://xx/truststore.jks',
      'properties.ssl.truststore.password' = 'xx',  -- dew凭据管理的key,其值为生成truststore.jks时设置的密码
      'properties.security.protocol' = 'SSL',
      'format' = 'json',
      'dew.endpoint' = 'kms.xx.com', --使用的DEW服务所在的endpoint信息
      'dew.csms.secretName' = 'xx', --DEW服务通用凭据的凭据名称
      'dew.csms.decrypt.fields' = 'properties.ssl.truststore.password', --其中properties.ssl.truststore.password字段值,需要利用DEW凭证管理,进行解密替换
      'dew.csms.version' = 'v1'
    );
    
    CREATE TABLE ordersSink (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'print'
    );
     insert into ordersSink select * from ordersSource;

常见问题

  • Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

    跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。

    具体操作请参考增强型跨源连接

  • Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword.

    sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。