更新时间:2024-03-12 GMT+08:00

ClickHouse结果表

功能描述

DLI将Flink作业数据输出到ClickHouse中。

ClickHouse是面向联机分析处理的列式数据库,支持SQL查询,且查询性能好,特别是基于大宽表的聚合分析查询性能非常优异,比其他分析型数据库速度快一个数量级。详细请参考ClickHouse组件操作

前提条件

  • 该场景作业需要运行在DLI的独享队列即非共享队列上。
  • 该场景需要与ClickHouse建立增强型跨源连接,并根据实际情况设置ClickHouse集群所在安全组规则中的端口。

    建立增强型跨源连接,请参考《数据湖探索用户指南》中的“增强型跨源连接”章节。

    如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。

注意事项

  • 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0,且勿开启kerberos认证。
  • Flink SQL语句中不能定义主键。同时不能使用任何产生主键的语法,例如insert into clickhouseSink select id, cout(*) from sourceName group by id。
  • Flink中支持字段类型范围为:string、tinyint、smallint、int、long、float、double、date、timestamp、decimal以及Array。

    其中Array中的数据类型仅支持int、bigint、string、float、double。

语法格式

1
2
3
4
5
6
7
8
9
create table clickhouseSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
with (
  'connector.type' = 'clickhouse',
  'connector.url' = '',
  'connector.table' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

固定为clickhouse

connector.url

ClickHouse的url。

参数格式为:jdbc:clickhouse://ClickHouseBalancer实例的IP:ClickHouseBalancer实例的http端口/数据库名

  • ClickHouseBalancer实例的IP地址:

    登录MRS管理控制台,选择“集群名称 > 组件管理 > ClickHouse > 实例”,获取ClickHouseBalancer实例的业务IP。

  • ClickHouseBalancer实例的http端口:

    登录MRS管理控制台,选择“集群名称 > 组件管理 > ClickHouse > 服务配置”,角色选择“ClickHouseBalancer”,搜索“lb_http_port”配置参数值。默认为:21425。

  • 数据库名为ClickHouse集群创建的数据库名称。

connector.table

要创建的ClickHouse的表名。

connector.driver

连接数据库所需要的驱动。

  • 如果建表时不指定该参数,驱动会自动通过ClickHouse的url提取。
  • 如果建表时指定该参数,则该参数值固定为“ru.yandex.clickhouse.ClickHouseDriver”。

connector.username

访问ClickHouse数据库的账号。

connector.password

访问ClickHouse数据库账号的密码。

connector.write.flush.max-rows

写数据时刷新数据的最大行数,默认值为:5000。

connector.write.flush.interval

刷新数据的时间间隔,单位可以为ms、milli、millisecond/s、sec、second/min、minute等。

connector.write.max-retries

写数据失败时的最大尝试次数,默认值为:3。

示例

从dis中读取数据,并将数据插入到数据库为flinktest、表名为test的ClickHouse数据库中。

  1. 创建dis数据源表disSource。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    create table disSource(
      attr0 string,
      attr1 TINYINT,
      attr2 smallint,
      attr3 int,
      attr4 bigint,
      attr5 float,
      attr6 double,
      attr7 String,
      attr8 string,
      attr9 timestamp(3),
      attr10 timestamp(3),
      attr11 date,
      attr12 decimal(38, 18),
      attr13 decimal(38, 18)
    ) with (
      "connector.type" = "dis",
      "connector.region" = "cn-xxxx-x",
      "connector.channel" = "xxxx",
      "format.type" = 'csv'
    );
    
  2. 创建ClickHouse结果表clickhouse,将disSource表数据插入到clickhouse结果表中。
    create table clickhouse(
      attr0 string,
      attr1 TINYINT,
      attr2 smallint,
      attr3 int,
      attr4 bigint,
      attr5 float,
      attr6 double,
      attr7 String,
      attr8 string,
      attr9 timestamp(3),
      attr10 timestamp(3),
      attr11 date,
      attr12 decimal(38, 18),
      attr13 decimal(38, 18),
      attr14 array < int >,
      attr15 array < bigint >,
      attr16 array < float >,
      attr17 array < double >,
      attr18 array < varchar >,
      attr19 array < String >
    ) with (
      'connector.type' = 'clickhouse',
      'connector.url' = 'jdbc:clickhouse://xx.xx.xx.xx:xx/flinktest',
      'connector.table' = 'test'
    );
    
    insert into
      clickhouse
    select
      attr0,
      attr1,
      attr2,
      attr3,
      attr4,
      attr5,
      attr6,
      attr7,
      attr8,
      attr9,
      attr10,
      attr11,
      attr12,
      attr13,
      array [attr3, attr3+1],
      array [cast(attr4 as bigint), cast(attr4+1 as bigint)],
      array [cast(attr12 as float), cast(attr12+1 as float)],
      array [cast(attr13 as double), cast(attr13+1 as double)],
      array ['TEST1', 'TEST2'],
      array [attr7, attr7]
    from
      disSource;