更新时间:2022-07-12 GMT+08:00

Raw Format

功能描述

Raw format 允许读写原始(基于字节)值作为单个列。

注意: 这种格式将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。

Raw format 连接器是内置的。

参数说明

表1

参数

是否必选

默认值

类型

描述

format

(none)

String

指定要使用的格式, 这里应该是 'raw'。

raw.charset

UTF-8

String

指定字符集来编码文本字符串。

raw.endianness

big-endian

String

指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅字节序。

支持的Connector

  • Kafka
  • UpsertKafka

示例

使用kafka发送数据,输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,选择flink1.12,并提交运行,其代码如下:

    create table kafkaSource(
      log string
      ) with (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.group.id' = '<yourGroupId>',
        'properties.bootstrap.servers' = '<yourKafkaAddress>:<yourKafkaPort>',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'raw'
    );
    create table printSink(
      log string
       ) with (
         'connector' = 'print'
       );
    insert into printSink select * from kafkaSource;

  3. 向kafka的相应topic中插入下列数据:

    47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"

  4. 用户可按下述操作查看输出结果:

    • 方法一:"更多" -> "FlinkUI" -> "Task Managers" -> "Stdout"。
    • 方法二:若在提交运行作业前选择了保存日志,则可以从日志的taskmanager.out文件中查看。
    +I(47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0"2005316"https://domain.com/?p=1"
    "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75")