更新时间:2024-02-07 GMT+08:00

Kafka源表

功能描述

创建source流从Kafka获取数据,作为作业的输入数据。

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

前提条件

  • 确保已创建Kafka集群。
  • 该场景作业需要运行在DLI的独享队列上,因此要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 建表时数据类型的使用请参考Format章节。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
create table kafkaSource(
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
  (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression)
)
with (
  'connector' = 'kafka',
  'topic' = '',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = '',
  'scan.startup.mode' = '',
  'format' = ''
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

参数说明

connector

String

指定要使用的连接器,固定为'kafka'。

topic

String

Kafka的topic名称。

注意:

  • “topic”和“topic-pattern”参数只能配置一个,不能同时配置。
  • 若有多个topic,请以';'分隔,如'topic-1;topic-2'。

topic-pattern

String

匹配读取kafka topic名称的正则表达式。

注意:“topic-pattern”和“topic”只能选择一个,不可同时存在。

例如:

'topic.*'

'(topic-c|topic-d)'

'(topic-a|topic-b|topic-\\d*)'

'(topic-a|topic-b|topic-[0-9]*)'

properties.bootstrap.servers

String

Kafka brokers地址,以逗号分隔。

properties.group.id

String

消费组名称

properties.*

String

设置和传入任意的Kafka原生配置文件。

注意:

  • “properties.”中的后缀名必须是Apache Kafka中的配置键。

    例如关闭自动创建topic:'properties.allow.auto.create.topics' = 'false'。

  • 存在一些配置不支持配置,如'key.deserializer'和'value.deserializer'。

format

String

序列化和反序列化Kafka消息的value的格式。注意:该参数和'value.format'参数只能选择一个。

请参考Format页面以获取更多详细信息和格式参数。

key.format

String

序列化和反序列化Kafka消息的key的格式。

注意:

  • 若配置了该参数,则'key.fields'也需要配置,否则kafka的记录中key会为空。
  • 请参考Format页面以获取更多详细信息和格式参数。

key.fields

[]

List<String>

定义表中的列作为key的列表,同时需要配置'key.format'。

该参数默认为空,因此没有定义key。

使用形式如:'field1;field2'。

key.fields-prefix

String

为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。

value.format

String

用于反序列化和序列化 Kafka 消息的值部分的格式。

注意:

  • value.format和format参数只能配置其中一个,如果同时配置两个,则会有冲突。
  • 请参考Format页面以获取更多详细信息和格式参数。

value.fields-include

ALL

枚举类型

可选值:[ALL, EXCEPT_KEY]

在解析消息体时,是否要包含消息键字段。

取值如下:

  • ALL(默认值):所有定义的字段都存放消息体(Value)解析出来的数据。
  • EXCEPT_KEY:除去key.fields定义字段,剩余的定义字段可以用来存放消息体(Value)解析出来的数据。

scan.startup.mode

group-offsets

String

Kafka读取数据的启动位点。

取值如下:

  • earliest-offset:从Kafka最早分区开始读取。
  • latest-offset:从Kafka最新位点开始读取。
  • group-offsets(默认值):根据Group读取。
  • timestamp:从Kafka指定时间点读取。配置该参数时,同时需要在WITH参数中指定scan.startup.timestamp-millis参数。
  • specific-offsets:从Kafka指定分区指定偏移量读取。配置该参数时,同时需要在WITH参数中指定scan.startup.specific-offsets参数。

scan.startup.specific-offsets

String

在scan.startup.mode参数指定为'specific-offsets'模式下生效,为每个分区指定偏移量,例如:partition:0,offset:42;partition:1,offset:300'。

scan.startup.timestamp-millis

Long

在scan.startup.mode参数指定为'timestamp'模式下生效,指定启动位点时间戳。

scan.topic-partition-discovery.interval

Duration

消费者定期发现动态创建的Kafka主题和分区的时间间隔。

ssl_auth_name

String

DLI侧创建的Kafka_SSL类型的跨源认证名称。Kafka配置SSL时使用该配置。

注意:若仅使用SSL类型,则需要同时配置'properties.security.protocol '= 'SSL';

若使用SASL_SSL类型,则需要同时配置

  • 'properties.security.protocol' = 'SASL_SSL';
  • 'properties.sasl.mechanism' = 'GSSAPI或者PLAIN';
  • 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";'

krb_auth_name

String

DLI侧创建的Kerberos类型的跨源认证名称。Kafka配置SASL认证时使用该配置。

注意:如果使用SASL_PLAINTEXT类型,且使用Kerberos认证,则需要同时配置'properties.sasl.mechanism' = 'GSSAPI'和'properties.security.protocol' = 'SASL_PLAINTEXT'

元信息列

您可以在源表中定义元信息列,以获取Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据。

表2 元信息列

Key

数据类型

是否可读(R)写(W)

说明

topic

STRING NOT NULL

R

Kafka消息所在的Topic名称。

partition

INT NOT NULL

R

Kafka消息所在的Partition名称。

headers

MAP<STRING, BYTES> NOT NULL

R/W

Kafka消息的headers。

leader-epoch

INT NULL

R

Kafka消息的Leader epoch。

其书写方式请参考示例1

offset

BIGINT NOT NULL

R

Kafka消息的偏移量(offset)。

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL

R/W

Kafka消息的时间戳。

timestamp-type

STRING NOT NULL

R

Kafka消息的时间戳类型:

  • NoTimestampType:消息中没有定义时间戳。
  • CreateTime:消息产生的时间。
  • LogAppendTime:消息被添加到Kafka Broker的时间。

    其书写方式请参考示例1

示例(适用于Kafka集群未开启SASL_SSL场景)

  • 示例1:读取Kafka的元信息列,输出到Print sink中。
    1. 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
      CREATE TABLE orders (
        `topic` String metadata,
        `partition` int metadata,
        `headers` MAP<STRING, BYTES> metadata,
        `leaderEpoch` INT metadata from 'leader-epoch',
        `offset` bigint metadata,
        `timestamp` TIMESTAMP(3) metadata,
        `timestampType` string metadata from 'timestamp-type',
        `message` string
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'KafkaTopic',
        'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
        'properties.group.id' = 'GroupId',
        'scan.startup.mode' = 'latest-offset',
        "format" = "csv",
        "csv.field-delimiter" = "\u0001",
        "csv.quote-character" = "''"
      );
      
      CREATE TABLE printSink (
        `topic` String,
        `partition` int,
        `headers` MAP<STRING, BYTES>,
        `leaderEpoch` INT,
        `offset` bigint,
        `timestamp` TIMESTAMP(3),
        `timestampType` string,
        `message` string --message表示读取kafka中存储的用户写入数据
      ) WITH (
        'connector' = 'print'
      );
      
      insert into printSink select * from orders;

      若不需要读取整个message的消息,而是需要读取每个字段的值,则需要将使用如下语句:

      CREATE TABLE orders (
        `topic` String metadata,
        `partition` int metadata,
        `headers` MAP<STRING, BYTES> metadata,
        `leaderEpoch` INT metadata from 'leader-epoch',
        `offset` bigint metadata,
        `timestamp` TIMESTAMP(3) metadata,
        `timestampType` string metadata from 'timestamp-type',
        order_id string,
        order_channel string,
        order_time string, 
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
        'properties.group.id' = 'GroupId',
        'scan.startup.mode' = 'latest-offset',
        "format" = "json"
      );
      
      CREATE TABLE printSink (
        `topic` String,
        `partition` int,
        `headers` MAP<STRING, BYTES>,
        `leaderEpoch` INT,
        `offset` bigint,
        `timestamp` TIMESTAMP(3),
        `timestampType` string,
        order_id string,
        order_channel string,
        order_time string, 
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'print'
      );
      
      insert into printSink select * from orders;
    4. 向Kafka的相应topic中发送如下数据:
      {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      
      {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      
      {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    5. 用户可按下述操作查看输出结果:
      1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
      2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
      3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

      数据结果参考如下:

      +I(fz-source-json,0,{},0,243,2021-12-27T09:23:32.253,CreateTime,{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"})
      +I(fz-source-json,0,{},0,244,2021-12-27T09:23:39.655,CreateTime,{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"})
      +I(fz-source-json,0,{},0,245,2021-12-27T09:23:48.405,CreateTime,{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"})

  • 示例2:将Kafka作为源表,Print作为结果表,从Kafka中读取编码格式为json数据类型的数据,输出到日志文件中。
    1. 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 创建flink opensource sql作业,输入以下作业运行脚本,并提交运行。
      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
      CREATE TABLE orders (
        order_id string,
        order_channel string,
        order_time timestamp(3),
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
        'properties.group.id' = 'GroupId',
        'scan.startup.mode' = 'latest-offset',
        "format" = "json"
      );
      
      CREATE TABLE printSink (
        order_id string,
        order_channel string,
        order_time timestamp(3),
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'print'
      );
      
      insert into printSink select * from orders;
    4. 向Kafka的相应topic中发送输入测试数据:
      {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 
      
      {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      
      {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    5. 用户可按下述操作查看输出结果:
      1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
      2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
      3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

      数据结果参考如下:

      +I(202103241000000001,webShop,2021-03-24T10:00,100.0,100.0,2021-03-2410:02:03,0001,Alice,330106)
      +I(202103241606060001,appShop,2021-03-24T16:06:06,200.0,180.0,2021-03-2416:10:06,0001,Alice,330106)
      +I(202103251202020001,miniAppShop,2021-03-25T12:02:02,60.0,60.0,2021-03-2512:03:00,0002,Bob,330110)

示例(适用于Kafka集群已开启SASL_SSL场景)

  • 示例1:DMS集群使用SASL_SSL认证方式。

    创建DMS的kafka集群,开启SASL_SSL,并下载SSL证书,将下载的证书client.jks上传到OBS桶中。

    CREATE TABLE ordersSource (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'xx',
      'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'properties.connector.auth.open' = 'true',
      'properties.ssl.truststore.location' = 'obs://xx/xx.jks',  -- 用户上传证书的位置
      'properties.sasl.mechanism' = 'PLAIN',  -- 按照SASL_PLAINTEXT方式填写
      'properties.security.protocol' = 'SASL_SSL',
      'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";',  -- 创建kafka集群时设置的账号和密码
      "format" = "json"
    );
     
    CREATE TABLE ordersSink (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'xx',
      'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093',
      'properties.connector.auth.open' = 'true',
      'properties.ssl.truststore.location' = 'obs://xx/xx.jks',
      'properties.sasl.mechanism' = 'PLAIN',
      'properties.security.protocol' = 'SASL_SSL',
      'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";',
      "format" = "json"
    );
     
    insert into ordersSink select * from ordersSource;
  • 示例2:MRS集群使用kafka SASL_SSL认证方式。
    • MRS集群请开启Kerberos认证。
    • 在”组件管理 > Kafka > 服务配置”中查找配置项” security.protocol”,并设置为”SASL_SSL”。
    • 登陆MRS集群的Manager,下载用户凭据:”系统设置 > 用户管理 ,点击用户名后的”更多 > 下载认证凭据”。

      根据用户凭据生成相应的truststore.jks文件,并将用户凭据以及truststore.jks文件传入OBS中。

    • 若运行作业提示“Message stream modified (41)”,可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。
    • 端口请使用KafKa服务配置中设置的sasl_ssl.port端口。
    • security.protocol请设置为SASL_SSL。
    CREATE TABLE ordersSource (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'xx',
      'properties.bootstrap.servers' = 'xx:21009,xx:21009',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.connector.auth.open' = 'true',
      'properties.connector.kerberos.principal' = 'xx',  -- 用户名
      'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf',
      'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab',
      'properties.security.protocol' = 'SASL_SSL',
      'properties.ssl.truststore.location' = 'obs://xx/truststore.jks',
      'properties.ssl.truststore.password' = 'xx',  -- 生成truststore.jks设置的密码
      'properties.sasl.mechanism' = 'GSSAPI',
      "format" = "json"
    );
     
    CREATE TABLE ordersSink (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'xx',
      'properties.bootstrap.servers' = 'xx:21009,xx:21009',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.connector.auth.open' = 'true',
      'properties.connector.kerberos.principal' = 'xx',
      'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf',
      'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab',
      'properties.ssl.truststore.location' = 'obs://xx/truststore.jks',
      'properties.ssl.truststore.password' = 'xx',
      'properties.security.protocol' = 'SASL_SSL',
      'properties.sasl.mechanism' = 'GSSAPI',
      "format" = "json"
    );
     
    insert into ordersSink select * from ordersSource;
  • 示例3:MRS集群使用SASL_PAINTEXT的Kerberos认证。
    • MRS集群请开启Kerberos认证。
    • 将“组件管理 > Kafka > 服务配置”中查找配置项” security.protocol”,并设置为”SASL_PLAINTEXT”。
    • 登陆MRS集群的Manager,下载用户凭据“系统设置 > 用户管理”,点击用户名后的“更多 > 下载认证凭据”,并上传到OBS中。
    • 若运行提示“Message stream modified (41)”的错误,可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。
    • 端口请使用KafKa服务配置中设置的sasl.port端口。
    • security.protocol请设置为SASL_PLAINTEXT。
    CREATE TABLE ordersSources (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'xx',
      'properties.bootstrap.servers' = 'xx:21007,xx:21007',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.connector.auth.open' = 'true',
      'properties.connector.kerberos.principal' = 'xx',
      'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf',
      'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.sasl.mechanism' = 'GSSAPI',
      "format" = "json"
    );
     
    CREATE TABLE ordersSink (
      order_id string,
      order_channel string,
      order_time timestamp(3),
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'xx',
      'properties.bootstrap.servers' = 'xx:21007,xx:21007',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.connector.auth.open' = 'true',
      'properties.connector.kerberos.principal' = 'xx',
      'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf',
      'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.sasl.mechanism' = 'GSSAPI',
      "format" = "json"
    );
     
    insert into ordersSink select * from ordersSource;
  • 示例4:MRS集群使用SSL方式。
    • MRS集群请不要开启Kerberos认证。
    • 登陆MRS集群的Manager,下载用户凭据:“系统设置 > 用户管理”。 点击用户名后的“更多 > 下载认证凭据”。

      根据用户凭据生成相应的truststore.jks文件,并将用户凭据以及truststore.jks文件传入OBS中。

    • 端口请注意使用KafKa服务配置中设置的ssl.port端口
    • security.protocol请设置为SSL。
    • ssl.mode.enable请设置为true。
      CREATE TABLE ordersSource (
        order_id string,
        order_channel string,
        order_time timestamp(3),
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'xx',
        'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093',
        'properties.group.id' = 'GroupId',
        'scan.startup.mode' = 'latest-offset',
        'properties.connector.auth.open' = 'true',
        'properties.ssl.truststore.location' = 'obs://xx/truststore.jks',
        'properties.ssl.truststore.password' = 'xx',  -- 生成truststore.jks时设置的密码
        'properties.security.protocol' = 'SSL',
        "format" = "json"
      );
       
      CREATE TABLE ordersSink (
        order_id string,
        order_channel string,
        order_time timestamp(3),
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'print'
      );
       
      insert into ordersSink select * from ordersSource;

常见问题

  • Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

    跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。参考增强型跨源连接重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。

  • Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword.

    sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。