更新时间:2022-09-29 GMT+08:00

Confluent Avro Format

功能描述

Avro Schema Registry (avro-confluent) 格式能让你读取被 io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化的记录。

当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。

当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.schema-registry.subject 参数来制定。

支持的connector

  • kafka
  • upsert kafka

参数说明

表1 参数说明

参数

是否必选

默认值

类型

说明

format

(none)

String

指定使用格式,这里应该是'avro-confluent'。

avro-confluent.schema-registry.subject

(none)

String

序列化期间,Confluent Schema Registry中注册schema所在的subject。

对于kafka和upsert-kafka,默认subject值是'<topic_name>-value' 或 '<topic_name>-key'

avro-confluent.schema-registry.url

(none)

String

注册或抓取schema的Confluent Schema Registry的URL。

示例

1. 从kafka中作为source的topic中读取json数据,并以confluent avro的形式写入作为sink的topic中

  1. 根据kafka和ecs所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka和ecs的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka或ecs的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 购买ecs集群,并下载5.5.2版本的confluent(https://packages.confluent.io/archive/5.5/)和jdk1.8.0_232,并上传到购买的ecs集群中,然后使用下述命令解压(假设解压目录分别为confluent-5.5.2和jdk1.8.0_232)。

    tar zxvf confluent-5.5.2-2.11.tar.gz
    tar zxvf jdk1.8.0_232.tar.gz

  3. 使用下述命令在当前ecs集群中安装jdk1.8.0_232(其中<yourJdkPath>可以在jdk1.8.0_232文件夹下使用"pwd"查看):

    export JAVA_HOME=<yourJdkPath>
    export PATH=$JAVA_HOME/bin:$PATH
    export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib

  4. 进入confluent-5.5.2/etc/schema-registry/目录下,修改schema-registry.properties文件中如下配置项:

    listeners=http://<yourEcsIp>:8081  
    kafkastore.bootstrap.servers=<yourKafkaAddress1>:<yourKafkaPort>,<yourKafkaAddress2>:<yourKafkaPort>

  5. 将ecs切换到confluent-5.5.2目录下,使用下述命令启动confluent:

    bin/schema-registry-start etc/schema-registry/schema-registry.properties

  6. 创建flink opensource sql作业,选择版本flink 1.12,并选择保存日志,然后提交运行:

    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,  
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '<yourKafkaAddress1>:<yourKafkaPort>,<yourKafkaAddress2>:<yourKafkaPort>',
      'topic' = '<yourSourceTopic>',
      'properties.group.id' = '<yourGroupId>',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    CREATE TABLE kafkaSink (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,  
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '<yourKafkaAddress1>:<yourKafkaPort>,<yourKafkaAddress2>:<yourKafkaPort>',
      'topic' = '<yourSinkTopic>',
      'format' = 'avro-confluent',
      'avro-confluent.schema-registry.url' = 'http://<yourEcsIp>:8081',
      'avro-confluent.schema-registry.subject' = '<yourSubject>'
    );
    insert into kafkaSink select * from kafkaSource;

  7. 向kafka中插入如下数据:

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}

  8. 读取kafka的作为sink的topic的数据,则可发现数据已经写入,且schema已经保存到kafka的_schema的topic中。