ClickHouse
功能描述
DLI支持将Flink作业数据输出到ClickHouse数据库中,表类型仅支持结果表。
ClickHouse是面向联机分析处理的列式数据库,支持SQL查询,且查询性能好,特别是基于大宽表的聚合分析查询性能非常优异,比其他分析型数据库速度快一个数量级。详细请参考ClickHouse组件操作。
类别 |
详情 |
---|---|
支持表类型 |
结果表 |
前提条件
- 该场景作业需要运行在DLI的独享队列上。
- 该场景需要与ClickHouse建立增强型跨源连接,并根据实际情况设置ClickHouse集群所在安全组规则中的端口。
如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0及以上版本。
- ClickHouse结果表不支持删除表数据操作。
- Flink中支持字段类型范围为:string、tinyint、smallint、int、bigint、float、double、date、timestamp、decimal以及Array。
语法格式
1 2 3 4 5 6 7 8 9 |
create table clickhouseSink ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'type' = 'clickhouse', 'url' = '', 'table-name' = '' ); |
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
固定为:clickhouse |
url |
是 |
无 |
String |
ClickHouse的url。 参数格式为:jdbc:clickhouse://ClickHouseBalancer实例业务IP1:ClickHouseBalancer端口,ClickHouseBalancer实例业务IP2:ClickHouseBalancer端口/数据库名
|
table-name |
是 |
无 |
String |
ClickHouse的表名。 |
driver |
否 |
ru.yandex.clickhouse.ClickHouseDriver |
String |
连接数据库所需要的驱动。若未配置,则会自动通过URL提取,默认为ru.yandex.clickhouse.ClickHouseDriver。 |
username |
否 |
无 |
String |
访问ClickHouse数据库的账号名,MRS集群开启Kerberos认证时需要填写。 |
password |
否 |
无 |
String |
访问ClickHouse数据库账号的密码,MRS集群开启Kerberos认证时需要填写。 |
sink.buffer-flush.max-rows |
否 |
100 |
Integer |
写数据时刷新数据的最大行数,默认值为100。 |
sink.buffer-flush.interval |
否 |
1s |
Duration |
刷新数据的时间间隔,单位可以为ms、milli、millisecond/s、sec、second/min、minute等,默认值为1s。设置为0则表示不根据时间刷新。 |
sink.max-retries |
否 |
3 |
Integer |
写数据失败时的最大尝试次数,默认值为3。 |
示例
- 示例1:从Kafka中读取数据,并将数据插入ClickHouse中(ClickHouse版本为MRS的21.3.4.25,且MRS集群未开启Kerberos认证):
- 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink弹性资源池。
- 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。
详细操作请参考从零开始使用ClickHouse。
select cluster,shard_num,replica_num,host_name from system.clusters;
其返回信息如下图:┌─cluster────┬────┬─shard_num─┐ │ default_cluster │ 1 │ 1 │ │ default_cluster │ 1 │ 2 │ └──────── ┴────┴────── ┘
根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。
CREATE DATABASE flink ON CLUSTER default_cluster;
- 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。
CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id;
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将DMS Kafka作为数据源,ClickHouse作业结果表。
如下脚本中的加粗参数请根据实际环境修改。
create table orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://ClickhouseAddress1:ClickhousePort,ClickhouseAddress2:ClickhousePort/flink', 'username' = 'username', 'password' = 'password', 'table-name' = 'order', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '3s' ); insert into clickhouseSink select * from orders;
- 连接Kafka集群,向DMS Kafka中插入以下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
- 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。
select * from flink.order;
查询结果参考如下:
202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106 202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110
- 示例2:从Kafka中读取数据,并将数据插入ClickHouse中(ClickHouse版本为MRS的21.3.4.25,且MRS集群开启Kerberos认证)
- 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink弹性资源池。
- 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。
select cluster,shard_num,replica_num,host_name from system.clusters;
其返回信息如下图:┌─cluster────┬────┬─shard_num─┐ │ default_cluster │ 1 │ 1 │ │ default_cluster │ 1 │ 2 │ └──────── ┴────┴────── ┘
根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。
CREATE DATABASE flink ON CLUSTER default_cluster;
- 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。
CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id;
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,ClickHouse作业结果表。
如下脚本中的加粗参数请根据实际环境修改。
create table orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://ClickhouseAddress1:ClickhousePort,ClickhouseAddress2:ClickhousePort/flink?ssl=true&sslmode=none', 'table-name' = 'order', 'username' = 'username', 'password' = 'password', --DEW凭据中的key 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '3s', 'dew.endpoint'='kms.xx.myhuaweicloud.com', --使用的DEW服务所在的endpoint信息 'dew.csms.secretName'='xx', --DEW服务通用凭据的凭据名称 'dew.csms.decrypt.fields'='password', --password字段值需要利用DEW凭证管理,进行解密替换 'dew.csms.version'='v1' ); insert into clickhouseSink select * from orders;
- 连接Kafka集群,向Kafka中插入以下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
- 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。
select * from flink.order;
查询结果参考如下:
202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106 202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110