更新时间:2024-02-07 GMT+08:00

ClickHouse结果表

功能描述

DLI支持将Flink作业数据输出到ClickHouse数据库中。ClickHouse是面向联机分析处理的列式数据库,支持SQL查询,且查询性能好,特别是基于大宽表的聚合分析查询性能非常优异,比其他分析型数据库速度快一个数量级。详细请参考ClickHouse组件操作

前提条件

  • 该场景作业需要运行在DLI的独享队列即非共享队列上。
  • 该场景需要与ClickHouse建立增强型跨源连接,并根据实际情况设置ClickHouse集群所在安全组规则中的端口。

    如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。

    如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0及以上版本,且勿开启kerberos认证。
  • ClickHouse结果表不支持删除表数据操作。
  • Flink中支持字段类型范围为:string、tinyint、smallint、int、long、float、double、date、timestamp、decimal以及Array。

    其中Array中的数据类型仅支持int、bigint、string、float、double。

语法格式

1
2
3
4
5
6
7
8
9
create table clickhouseSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
with (
  'connector.type' = clickhouse,
  'connector.url' = '',
  'connector.table' = ''
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

说明

connector.type

String

固定为:clickhouse

connector.url

String

ClickHouse的url。

参数格式为:jdbc:clickhouse://ClickHouseBalancer实例的IP:ClickHouseBalancer实例的http端口/数据库名

  • ClickHouseBalancer实例的IP地址:

    登录MRS管理控制台,选择“集群名称 > 组件管理 > ClickHouse > 实例”,获取ClickHouseBalancer实例的业务IP。

  • ClickHouseBalancer实例的http端口:

    登录MRS管理控制台,选择“集群名称 > 组件管理 > ClickHouse > 服务配置”,角色选择“ClickHouseBalancer”,搜索“lb_http_port”配置参数值。默认为:21425。

  • 数据库名为ClickHouse集群创建的数据库名称。

connector.table

String

要创建的ClickHouse的表名。

connector.driver

ru.yandex.clickhouse.ClickHouseDriver

String

连接数据库所需要的驱动。

  • 如果建表时不指定该参数,驱动会自动通过ClickHouse的url提取。
  • 如果建表时指定该参数,则该参数值固定为“ru.yandex.clickhouse.ClickHouseDriver”。

connector.username

String

访问ClickHouse数据库的账号。

connector.password

String

访问ClickHouse数据库账号的密码。

connector.write.flush.max-rows

5000

Integer

写数据时刷新数据的最大行数,默认值为:5000。

connector.write.flush.interval

0

Duration

刷新数据的时间间隔,单位可以为ms、milli、millisecond/s、sec、second/min、minute等。

为0则表示不根据时间刷新

connector.write.max-retries

3

Integer

写数据失败时的最大尝试次数,默认值为:3。

示例

从Kafka中读取数据,并将数据插入到数据库为flink、表名为order的ClickHouse数据库中,其具体步骤如下(clickhouse版本为MRS的21.3.4.25):

  1. 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink作业队列。
  2. 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 参考从零开始使用ClickHouse使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。
    select cluster,shard_num,replica_num,host_name from system.clusters;
    其返回信息如下图:
    ┌─cluster────┬────┬─shard_num─┐
    │ default_cluster │    1   │           1 │
    │ default_cluster │    1   │           2 │
    └──────── ┴────┴────── ┘
  4. 根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。
    CREATE DATABASE flink ON CLUSTER default_cluster;
  5. 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。
    CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id;
  6. 创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,ClickHouse作业结果表。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    create table clickhouseSink(
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) with (
      'connector.type' = 'clickhouse',
      'connector.url' = 'jdbc:clickhouse://ClickhouseAddress:ClickhousePort/flink',
      'connector.table' = 'order',
      'connector.write.flush.max-rows' = '1'
    );
    
    insert into clickhouseSink select * from orders;
  7. 连接Kafka集群,向Kafka中插入以下测试数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
  8. 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。
    select * from flink.order;
    查询结果参考如下:
    202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106
    
    202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 
    
    202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110 

常见问题