Avro Format
功能描述
Avro格式允许基于Avro schema 读取和写入Avro 数据。目前,Avro schema 从表schema 推导。
支持的Connector
- Kafka
- Upsert Kafka
参数说明
参数 |
是否必选 |
默认值 |
类型 |
说明 |
---|---|---|---|---|
format |
是 |
(none) |
String |
指定使用格式,这里应该是'avro'。 |
avro.codec |
否 |
(none) |
String |
仅用于文件系统,avro 压缩编解码器。默认不压缩。目前支持:deflate、snappy、bzip2、xz。 |
数据类型映射
目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。
除了下面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null),其中 something 是从 Flink 类型转换的 Avro 类型。
Flink SQL类型 |
Avro类型 |
Avro逻辑类型 |
---|---|---|
CHAR / VARCHAR / STRING |
string |
- |
BOOLEAN |
boolean |
- |
BINARY / VARBINARY |
bytes |
- |
DECIMAL |
fixed |
decimal |
TINYINT |
int |
- |
SMALLINT |
int |
- |
INT |
int |
- |
BIGINT |
long |
- |
FLOAT |
float |
- |
DOUBLE |
double |
- |
DATE |
int |
date |
TIME |
int |
time-millis |
TIMESTAMP |
long |
timestamp-millis |
ARRAY |
array |
- |
MAP(key 必须是 string/char/varchar 类型) |
map |
- |
MULTISET(元素必须是 string/char/varchar 类型) |
map |
- |
ROW |
record |
- |
示例
读取kafka中的数据,以avro格式反序列化,并输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
- 创建flink opensource sql作业,选择flink1.12,并提交运行,其代码如下:
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaAddress1>:<yourKafkaPort>,<yourKafkaAddress2>:<yourKafkaPort>,<yourKafkaAddress3>:<yourKafkaPort>', 'properties.group.id' = '<yourGroupId>', 'scan.startup.mode' = 'latest-offset', "format" = "avro" ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
- 向kafka中以avro的序列化方式插入如下数据:
{"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}
- 用户可按下述操作查看输出结果:
- 方法一:"更多" -> "FlinkUI" -> "Task Managers" -> "Stdout"。
- 方法二:若在提交运行作业前选择了保存日志,则可以从日志的taskmanager.out文件中查看。
+I(202103241000000001,webShop,2021-03-2410:00:00,100.0,100.0,2021-03-2410:02:03,0001,Alice,330106) +I(202103241606060001,appShop,2021-03-2416:06:06,200.0,180.0,2021-03-2416:10:06,0001,Alice,330106)