ClickHouse结果表
功能描述
DLI将Flink作业数据输出到ClickHouse中。
ClickHouse是面向联机分析处理的列式数据库,支持SQL查询,且查询性能好,特别是基于大宽表的聚合分析查询性能非常优异,比其他分析型数据库速度快一个数量级。详细请参考ClickHouse组件操作。
前提条件
该场景需要与ClickHouse建立增强型跨源连接,并根据实际情况设置ClickHouse集群所在安全组规则中的端口。
建立增强型跨源连接,请参考《数据湖探索用户指南》中的“增强型跨源连接”章节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
注意事项
- 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0,且勿开启kerberos认证。
- Flink SQL语句中不能定义主键。同时不能使用任何产生主键的语法,例如insert into clickhouseSink select id, cout(*) from sourceName group by id。
- Flink中支持字段类型范围为:string、tinyint、smallint、int、long、float、double、date、timestamp、decimal以及Array。
语法格式
1 2 3 4 5 6 7 8 9 |
create table clickhouseSink ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'clickhouse', 'connector.url' = '', 'connector.table' = '' ); |
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector.type |
是 |
固定为clickhouse |
connector.url |
是 |
ClickHouse的url。 参数格式为:jdbc:clickhouse://ClickHouseBalancer实例的IP:ClickHouseBalancer实例的http端口/数据库名 |
connector.table |
是 |
要创建的ClickHouse的表名。 |
connector.driver |
否 |
连接数据库所需要的驱动。
|
connector.username |
否 |
访问ClickHouse数据库的账号。 |
connector.password |
否 |
访问ClickHouse数据库账号的密码。 |
connector.write.flush.max-rows |
否 |
写数据时刷新数据的最大行数,默认值为:5000。 |
connector.write.flush.interval |
否 |
刷新数据的时间间隔,单位可以为ms、milli、millisecond/s、sec、second/min、minute等。 |
connector.write.max-retries |
否 |
写数据失败时的最大尝试次数,默认值为:3。 |
示例
从dis中读取数据,并将数据插入到数据库为flinktest、表名为test的ClickHouse数据库中。
- 创建dis数据源表disSource。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
create table disSource( attr0 string, attr1 TINYINT, attr2 smallint, attr3 int, attr4 bigint, attr5 float, attr6 double, attr7 String, attr8 string, attr9 timestamp(3), attr10 timestamp(3), attr11 date, attr12 decimal(38, 18), attr13 decimal(38, 18) ) with ( "connector.type" = "dis", "connector.region" = "cn-xxxx-x", "connector.channel" = "xxxx", "format.type" = 'csv' );
- 创建ClickHouse结果表clickhouse,将disSource表数据插入到clickhouse结果表中。
create table clickhouse( attr0 string, attr1 TINYINT, attr2 smallint, attr3 int, attr4 bigint, attr5 float, attr6 double, attr7 String, attr8 string, attr9 timestamp(3), attr10 timestamp(3), attr11 date, attr12 decimal(38, 18), attr13 decimal(38, 18), attr14 array < int >, attr15 array < bigint >, attr16 array < float >, attr17 array < double >, attr18 array < varchar >, attr19 array < String > ) with ( 'connector.type' = 'clickhouse', 'connector.url' = 'jdbc:clickhouse://xx.xx.xx.xx:xx/flinktest', 'connector.table' = 'test' ); insert into clickhouse select attr0, attr1, attr2, attr3, attr4, attr5, attr6, attr7, attr8, attr9, attr10, attr11, attr12, attr13, array [attr3, attr3+1], array [cast(attr4 as bigint), cast(attr4+1 as bigint)], array [cast(attr12 as float), cast(attr12+1 as float)], array [cast(attr13 as double), cast(attr13+1 as double)], array ['TEST1', 'TEST2'], array [attr7, attr7] from disSource;