更新时间:2022-07-12 GMT+08:00
Raw Format
功能描述
Raw format 允许读写原始(基于字节)值作为单个列。
注意: 这种格式将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。
Raw format 连接器是内置的。
参数说明
参数 |
是否必选 |
默认值 |
类型 |
描述 |
---|---|---|---|---|
format |
是 |
(none) |
String |
指定要使用的格式, 这里应该是 'raw'。 |
raw.charset |
否 |
UTF-8 |
String |
指定字符集来编码文本字符串。 |
raw.endianness |
否 |
big-endian |
String |
指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅字节序。 |
支持的Connector
- Kafka
- UpsertKafka
示例
使用kafka发送数据,输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
- 创建flink opensource sql作业,选择flink1.12,并提交运行,其代码如下:
create table kafkaSource( log string ) with ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.group.id' = '<yourGroupId>', 'properties.bootstrap.servers' = '<yourKafkaAddress>:<yourKafkaPort>', 'scan.startup.mode' = 'latest-offset', 'format' = 'raw' ); create table printSink( log string ) with ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
- 向kafka的相应topic中插入下列数据:
47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"
- 用户可按下述操作查看输出结果:
- 方法一:"更多" -> "FlinkUI" -> "Task Managers" -> "Stdout"。
- 方法二:若在提交运行作业前选择了保存日志,则可以从日志的taskmanager.out文件中查看。
+I(47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0"2005316"https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75")
父主题: Format