更新时间:2024-04-19 GMT+08:00

Avro Format

功能描述

Avro格式允许基于Avro schema 读取和写入Avro 数据。目前,Avro schema 从表schema 推导。

更多具体使用可参考开源社区文档:Avro Format

支持的Connector

  • Kafka
  • Upsert Kafka
  • FileSystem

参数说明

表1 参数说明

参数

是否必选

默认值

类型

说明

format

(none)

String

指定使用格式,这里应该是'avro'。

avro.codec

(none)

String

仅用于FileSystem,avro 压缩编解码器。默认 snappy 压缩。目前支持:null, deflate、snappy、bzip2、xz。

数据类型映射

目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。

除了下面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null),其中 something 是从 Flink 类型转换的 Avro 类型。

您可以参考 Avro 规范 获取更多有关 Avro 类型的信息。

表2 数据类型映射

Flink SQL类型

Avro类型

Avro逻辑类型

CHAR / VARCHAR / STRING

string

-

BOOLEAN

boolean

-

BINARY / VARBINARY

bytes

-

DECIMAL

fixed

decimal

TINYINT

int

-

SMALLINT

int

-

INT

int

-

BIGINT

long

-

FLOAT

float

-

DOUBLE

double

-

DATE

int

date

TIME

int

time-millis

TIMESTAMP

long

timestamp-millis

ARRAY

array

-

MAP(key 必须是 string/char/varchar 类型)

map

-

MULTISET(元素必须是 string/char/varchar 类型)

map

-

ROW

record

-

示例

读取kafka中的数据,以avro格式反序列化,并输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列 > 找到作业的所属队列 > 更多 > 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,选择flink1.15,并提交运行,其代码如下:

    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'avro'
    );
    
    
    CREATE TABLE printSink (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'print'
    );
    insert into printSink select * from kafkaSource;

  3. 向kafka中以avro的序列化方式插入如下数据:

    {"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"}
    
    {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}

  4. 按照如下方式查看taskmanager.out文件中的数据结果:

    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
    +I[202103241000000001, webShop, 2021-03-24 10:00:00, 100.0, 100.0, 2021-03-24 10:02:03, 0001, Alice, 330106]
    +I[202103241606060001, appShop, 2021-03-24 16:06:06, 200.0, 180.0, 2021-03-24 16:10:06, 0001, Alice, 330106]