更新时间:2024-12-10 GMT+08:00
分享

做结果表

格式语法

SQL语法格式可能在不同Flink环境下有细微差异,具体以事件环境格式为准,with后面的参数名称及参数值以此文档为准。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
create table dwsSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'dws',
  'url' = '',
  'tableName' = '',
  'username' = '',
  'password' = ''
);

Flink SQL配置参数

Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。

表1 数据库配置

参数

说明

默认值

connector

flink框架区分connector参数,固定为dws。

-

url

数据库连接地址。

-

username

配置连接用户。

-

password

配置密码。

-

tableName

对应dws表。

-

表2 连接配置

参数

说明

默认值

connectionSize

初始dws-client时的并发数量。

1

connectionMaxUseTimeSeconds

连接创建多少秒后强制释放(单位:秒)。

3600(一小时)

connectionMaxIdleMs

连接最大空闲时间,超过后将释放(单位:毫秒)。

60000(一分钟)

表3 写入参数

参数

说明

默认值

conflictStrategy

有主键表数据写入时主键冲突策略:

  • ignore:保持原数据,忽略更新数据。
  • update:用新数据中非主键列更新原数据中对应列。
  • replace:用新数据替换原数据。
    说明:

    update和replace在全字段upsert时等效,在部分字段upsert时,replace相当于将数据中不包含的列设置为null。

update

writeMode

入库方式:

  • auto:系统自动选择。
  • copy_merge:当存在主键时使用copy方式入临时表,从临时表merge至目标表;无主键时直接copy至目标表。
  • copy_upsert:当存在主键时使用copy方式入临时表,从临时表upsert至目标表;无主键时直接copy至目标表。
  • upsert: 有主键用upsert sql入库;无主键用insert into入库。
  • UPDATE:使用update where语法更新数据,若原表无主键可选择指定uniqueKeys,指定字段不要求必须是唯一索引,但非唯一索引可能会影响性能。
  • COPY_UPDATE:数据先通过copy方式入库到临时表,通过临时表加速使用update from where方式更新目标数据。
  • UPDATE_AUTO:批量小于copyWriteBatchSize使用UPDATE,否则使用COPY_UPDATE。

auto

maxFlushRetryTimes

在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。

3

autoFlushBatchSize

自动刷库的批大小(攒批大小)。

5000

autoFlushMaxInterval

自动刷库的最大间隔时间(攒批时长)。

5s

copyWriteBatchSize

在“writeMode == auto”下,使用copy的批大小。

5000

ignoreDelete

忽略flink任务中的delete。

false (1.0.10前默认true)

ignoreNullWhenUpdate

是否忽略flink中字段值为null的更新,只有在“conflictStrategy == update”时有效。

false

metadataCacheSeconds

系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。

180

copyMode

copy入库格式:

  • CSV:将数据拼接为CSV格式入库,该方式稳定,但性能略低。
  • DELIMITER:用分隔符将数据拼接,然后入库,该方式需要数据中不包含分隔符。

CSV

createTempTableMode

创建临时表方式:

  • AS
  • LIKE

AS

numberAsEpochMsForDatetime

如果数据库为时间类型数据源为数字类型,是否将数据当成时间戳转换为对应时间类型。

false

stringToDatetimeFormat

如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。

null

sink.parallelism

flink系统参数用于设置sink并发数量。

跟随上游算子

printDataPk

是否在connector接收到数据时打印数据主键,用于排查问题。

false

ignoreUpdateBefore

忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。

true

示例

该示例是从kafka数据源中读取数据,写入DWS结果表中,并指定攒批时间不超过10秒,每批数据最大30000条,其具体步骤如下:

  1. 在GaussDB(DWS)数据库中创建表public.dws_order
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    create table public.dws_order(
      order_id VARCHAR,
      order_channel VARCHAR,
      order_time VARCHAR,
      pay_amount FLOAT8,
      real_pay FLOAT8,
      pay_time VARCHAR,
      user_id VARCHAR,
      user_name VARCHAR,
      area_id VARCHAR
      );
    
  2. 消费Kafka中order_test topic中的数据作为数据源,public.dws_order作为结果表,Kafka数据为JSON格式,并且字段名称和数据库字段名称一一对应:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'order_test',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE dwsSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'dws',
      'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName',
      'tableName' = 'dws_order',
      'username' = 'DWSUserName',
      'password' = 'DWSPassword',
      'autoFlushMaxInterval' = '10s',
      'autoFlushBatchSize' = '30000'
    );
    
    insert into dwsSink select * from kafkaSource;
    
  3. 给Kafka写入测试数据:
    1
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
  4. 等10秒后在GaussDB(DWS)表中查询结果:
    1
     select * from dws_order
    

    结果如下:

常见问题

  • Q:writeMode参数设置什么值比较合适?

    A:根据业务场景分update(只更新存在的数据)和upsert(对于同一主键数据如果存在就更新,不存在就新增一条数据)两个类型,推荐直接使用auto方式即可,该方式下会根据数据量的大小自动选择,如果数据量较大会增大攒批参数autoFlushBatchSize,即可提升入库性能。

  • Q:autoFlushBatchSizeautoFlushMaxInterval怎么设置比较合适?
    A:autoFlushBatchSize参数用于设置最大攒批条数,autoFlushMaxInterval参数用于设置最大攒批间隔,两个参数分别从时间和空间维度管控攒批。
    • 通过autoFlushMaxInterval可保证数据量较小时的时效性,如对时效性无强制要求通常不建议设置的太小,建议不低于3s走默认值即可。
    • 通过autoFlushBatchSize可控制一批数据的最大条数,一般来说攒批量越大,对于整体入库性能会更好,对性能来说通常该参数的设置推荐越大越好,参数的设置根据业务数据的大小以及flink运行内存来设置,保证不内存溢出。

      对于大多业务来说无需设置autoFlushMaxInterval,将autoFlushBatchSize设置为50000即可。

  • Q: 遇到数据库死锁了怎么办?

    A:通常出现死锁大致分为行锁死锁和分布式死锁。

    • 行锁:该场景通常为同一主键数据的并发更新造成行锁,该情况可以通过对数据做key by解决,key by必须根据数据库主键做,保证同一个主键数据会在同一个并发中,破坏掉并发更新的条件,无法造成死锁。Flink SQL做key by需要Flink本身支持,对于DLI/MRS均能实现,如MRS flink通过增加参数“key-by-before-sink=true”可实现key by。具体怎么使用以实现方为准,对于无法使用的建议使用API方式入库。
    • 分布式死锁:该场景通常为列存表的并发更新造成分布式死锁,暂无法解决,建议使用行存或者hstore。

相关文档