更新时间:2023-03-10 GMT+08:00

CSV Format

功能描述

CSV Format 允许我们基于CSV schema 进行解析和生成CSV 数据。目前的CSV schema 是基于table schema 推导出来的。

支持的Connector

  • Kafka
  • Upsert Kafka

参数说明

表1

参数

是否必选

默认值

类型

说明

format

(none)

String

指定要使用的格式,这里应该是 'csv'。

csv.field-delimiter

String

字段分隔符 (默认','),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 '\t' 代表制表符。 你也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 'csv.field-delimiter' = '\u0001' 代表 0x01 字符。

csv.disable-quote-character

false

Boolean

是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 'csv.quote-character' 不能设置。

csv.quote-character

‘’

String

用于围住字段值的引号字符 (默认").

csv.allow-comments

false

Boolean

是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。

csv.ignore-parse-errors

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。

csv.array-element-delimiter

String

分隔数组和行元素的字符串(默认';').

csv.escape-character

(none)

String

转义字符(默认关闭).

csv.null-literal

(none)

String

是否将 "null" 字符串转化为 null 值。

示例

使用kafka发送数据,输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列-->找到作业的所属队列-->更多-->测试地址连通性-->输入kafka的地址-->测试)。若能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,并提交运行,其代码如下:

    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourSourceTopic>',
      'properties.bootstrap.servers' = '<yourKafkaAddress>:<yourKafkaPort>',
      'properties.group.id' = '<yourGroupId>',
      'scan.startup.mode' = 'latest-offset',
      "format" = "csv"
    );
    
    CREATE TABLE kafkaSink (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourSinkTopic>',
      'properties.bootstrap.servers' = '<yourKafkaAddress>:<yourKafkaPort>',
      "format" = "csv"
    );
    
    insert into kafkaSink select * from kafkaSource;
    

  3. 向kafka的作为source的topic中插入下列数据:

    202103251505050001,qqShop,2021-03-25 15:05:05,500.00,400.00,2021-03-25 15:10:00,0003,Cindy,330108
    
    202103241606060001,appShop,2021-03-24 16:06:06,200.00,180.00,2021-03-24 16:10:06,0001,Alice,330106

  4. 读取kafka中作为sink的topic,结果如下:

    202103251505050001,qqShop,"2021-03-25 15:05:05",500.0,400.0,"2021-03-25 15:10:00",0003,Cindy,330108
    
    202103241606060001,appShop,"2021-03-24 16:06:06",200.0,180.0,"2021-03-24 16:10:06",0001,Alice,330106