更新时间:2024-11-08 GMT+08:00

Confluent Avro Format

功能描述

Avro Schema Registry (avro-confluent) 格式能让你读取被 io.confluent.kafka.serializers.KafkaAvroSerializer 序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer 反序列化的记录。

当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。

当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.subject 参数来制定。

支持的connector

  • kafka
  • upsert kafka

参数说明

表1 参数说明

参数

是否必选

默认值

类型

说明

format

String

指定使用格式,此处使用'avro-confluent'。

avro-confluent.basic-auth.credentials-source

String

Schema Registry的基本身份验证凭据源。

avro-confluent.basic-auth.user-info

String

Schema Registry的基本身份验证用户信息。

avro-confluent.bearer-auth.credentials-source

String

Schema Registry的承载身份验证凭据源。

avro-confluent.bearer-auth.token

String

Schema Registry的承载身份验证Token。

avro-confluent.properties

Map

转发到底层Schema Registry的属性Map。这对于没有通过Flink显示配置的配置项非常有用。但是,请注意,Flink配置项具有更高的优先级。

avro-confluent.ssl.keystore.location

String

SSL keystore的位置/文件。

avro-confluent.ssl.keystore.password

String

SSL keystore的密码。

avro-confluent.ssl.truststore.location

String

SSL truststore的位置/文件。

avro-confluent.ssl.truststore.password

String

SSL truststore的密码。

avro-confluent.subject

String

用于在序列化期间此格式使用的注册schema的Confluent Schema Registry主题。默认情况下,'kafka'和'upsert-kafka'连接器使用'<topic_name>-value'或'<topic_name>-key'作为默认主题名称,如果此格式用作键或值的格式。但是对于其他连接器(例如'filesystem'),在用作sink时需要使用主题选项。

avro-confluent.url

String

用于获取/注册架构的Confluent Schema Registry的URL。

数据类型映射

目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 Avro Format中描述了 Flink 数据类型和 Avro 类型的对应关系。

除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro union(something, null), 其中 something 是从 Flink 类型转换的 Avro 类型。

示例

从kafka中作为source的topic中读取json数据,并以confluent avro的形式写入作为sink的topic中。

  1. 根据kafka和ecs所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka和ecs的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka或ecs的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 购买ecs集群,并下载5.5.2版本的confluent和jdk1.8.0_232,并上传到购买的ecs集群中,然后使用下述命令解压(假设解压目录分别为confluent-5.5.2和jdk1.8.0_232)。
    tar zxvf confluent-5.5.2-2.11.tar.gz
    tar zxvf jdk1.8.0_232.tar.gz
  3. 使用下述命令在当前ecs集群中安装jdk1.8.0_232(其中<yourJdkPath>可以在jdk1.8.0_232文件夹下使用"pwd"查看):
    export JAVA_HOME=<yourJdkPath>
    export PATH=$JAVA_HOME/bin:$PATH
    export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
  4. 进入confluent-5.5.2/etc/schema-registry/目录下,修改schema-registry.properties文件中如下配置项:
    listeners=http://<yourEcsIp>:8081  
    kafkastore.bootstrap.servers=<yourKafkaAddress1>:<yourKafkaPort>,<yourKafkaAddress2>:<yourKafkaPort>
  5. 将ecs切换到confluent-5.5.2目录下,使用下述命令启动confluent:
    bin/schema-registry-start etc/schema-registry/schema-registry.properties
  6. 创建flink opensource sql作业,选择版本flink 1.15,并选择保存日志,然后提交运行:
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,    
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaSourceTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE kafkaSink (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,    
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaSinkTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'format' = 'avro-confluent',
      'avro-confluent.url' = 'http://EcsIp:8081'
    );
    insert into kafkaSink select * from kafkaSource;
  7. 向kafka中插入如下数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  8. 读取kafka的作为sink的topic的数据,则可发现数据已经写入,且schema已经保存到kafka的_schema的topic中。