FlinkServer对接ClickHouse
操作场景
Flink通过对接ClickHouse的ClickHouseBalancer实例进行读写,有效避免ClickHouse流量分发问题。
FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。
前提条件
- 集群中已安装ClickHouse、HDFS、Yarn、Flink和Kafka等服务,ClickHouse服务中的逻辑集群已存在并运行正常。
- 已安装集群客户端,例如安装路径为:/opt/client。
- 集群已启用Kerberos认证(安全模式),需提前在FusionInsight Manager中创建具有创建ClickHouse数据表、创建FlinkServer作业、Kafka操作权限的用户。
FlinkSQL与ClickHouse数据类型对应关系说明
FlinkSQL数据类型 |
ClickHouse数据类型 |
---|---|
BOOLEAN |
UInt8 |
TINYINT |
Int8 |
SMALLINT |
Int16 |
INTEGER |
Int32 |
BIGINT |
Int64 |
FLOAT |
Float32 |
DOUBLE |
Float64 |
CHAR |
String |
VARCHAR |
String |
VARBINARY |
FixedString |
DATE |
Date |
TIMESTAMP |
DateTime |
DECIMAL |
Decimal |
操作步骤
- 使用root用户登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录。
cd /opt/client
- 执行以下命令配置环境变量。
source bigdata_env
- 如果当前集群已启用Kerberos认证(安全模式),执行以下命令认证用户,用户需要具有创建ClickHouse表的权限。如果当前集群未启用Kerberos认证(普通模式),则无需执行此命令。
kinit 组件业务用户
例如,kinit clickhouseuser。
- 执行以下命令连接ClickHouse服务端。
- 集群未启用Kerberos认证(普通模式):
clickhouse client --host 待连接的ClickHouseServer实例IP地址 --user 登录名 --password 密码 --port ClickHouse的端口号 --multiline
命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。
- 集群已启用Kerberos认证(安全模式):
clickhouse client --host 待连接的ClickHouse的实例IP地址 --port ClickHouse的端口号 --secure --multiline
更多ClickHouse客户端操作可参考从零开始使用ClickHouse。
- 集群未启用Kerberos认证(普通模式):
- 执行以下命令创建一个复制表和分布式表。
- 例如创建的复制表名称为“default.test1”,所连接的ClickHouse逻辑集群名称为“default_cluster”。
CREATE TABLE default.test1 on cluster default_cluster
(
`pid` Int8,
`uid` UInt8,
`Int_16` Int16,
`Int_32` Int32,
`Int_64` Int64,
`String_x` String,
`String_y` String,
`float_32` Float32,
`float_64` Float64,
`Decimal_x` Decimal32(2),
`Date_x` Date,
`DateTime_x` DateTime
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/{数据库名}/test1','{replica}')
PARTITION BY pid
ORDER BY (pid, DateTime_x);
- 创建分布式表“test1_all”。
CREATE TABLE test1_all on cluster default_cluster
(
`pid` Int8,
`uid` UInt8,
`Int_16` Int16,
`Int_32` Int32,
`Int_64` Int64,
`String_x` String,
`String_y` String,
`float_32` Float32,
`float_64` Float64,
`Decimal_x` Decimal32(2),
`Date_x` Date,
`DateTime_x` DateTime
)
ENGINE = Distributed(default_cluster, default, test1, rand());
- 例如创建的复制表名称为“default.test1”,所连接的ClickHouse逻辑集群名称为“default_cluster”。
- 使用具有FlinkServer操作权限的用户,登录FusionInsight Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,进入FlinkServer的WebUI界面。
更多FlinkServer权限说明信息可参考Flink WebUI权限管理。
- 在“作业管理”界面单击“新建作业”,新建一个Flink SQL作业并提交。
作业类型选择“流作业”,在作业开发界面参考以下示例语句进行配置,然后提交作业。在作业“基础参数”中,需勾选“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”使用默认值即可。
更多FlinkServer作业参数详细说明可参考新建作业。
- 如果当前MRS集群已启用Kerberos认证(安全模式),示例语句如下:
create table kafkasource( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) with( 'connector' = 'kafka', 'topic' = 'input', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP1:Kafka端口号,Kafka的Broker实例业务IP2:Kafka端口号,Kafka的Broker实例业务IP3:Kafka端口号', 'properties.group.id' = 'group1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE cksink ( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例业务IP1:ClickHouseBalancer端口,ClickHouseBalancer实例业务IP2:ClickHouseBalancer端口/default?ssl=true&sslmode=none', 'username' = 'ClickHouse用户', 'password' = 'ClickHouse用户密码', 'table-name' = 'test1_all', 'driver' = 'com.clickhouse.ClickHouseDriver', 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' ); Insert into cksink select * from kafkasource;
- 如果当前MRS集群未启用Kerberos认证(普通模式),示例语句如下:
create table kafkasource( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) with( 'connector' = 'kafka', 'topic' = 'kinput', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP1:Broker端口号,Kafka的Broker实例业务IP2:Kafka端口号,Kafka的Broker实例业务IP3:Kafka端口号', 'properties.group.id' = 'kafka_test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE cksink ( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例业务IP1:ClickHouseBalancer端口,ClickHouseBalancer实例业务IP2:ClickHouseBalancer端口/default', 'table-name' = 'test1_all', 'driver' = 'com.clickhouse.ClickHouseDriver', 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' ); Insert into cksink select * from kafkasource;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- ClickHouseBalancer实例IP地址及端口号说明:
服务的实例IP地址可通过登录FusionInsight Manager后,选择“集群 > 服务 > ClickHouse > 实例”,在实例列表页面中查询。
ClickHouseBalancer端口号要根据对接的ClickHouse逻辑集群选择:登录FusionInsight Manager后,选择“集群 > 服务 > ClickHouse > 逻辑集群”,查看“HTTP Balancer端口号”。建议配置多个ClickHouseBalancer实例IP以避免ClickHouseBalancer实例单点故障。
- 集群已启用Kerberos认证(安全模式),创建的cksink表中username、password参数需填写具有ClickHouse相应表操作权限的用户及密码,可参考ClickHouse用户及权限管理。
- 写入ClickHouse时会过滤Flink计算过程中产生的DELETE消息。
- 攒批写参数:Flink会将数据先放入内存,到达触发条件时再flush到数据库表中。相关配置如下。
sink.buffer-flush.max-rows:攒批写ClickHouse的行数,默认100。
sink.buffer-flush.interval:攒批写入的间隔时间,默认1s。
这两个条件只要有一个满足,就会触发一次sink,即到达触发条件时再flush到数据库表中。
- 如果当前MRS集群已启用Kerberos认证(安全模式),示例语句如下:
- 作业提交完成后,查看Flink作业管理界面,等待作业状态变为“运行中”。
- 使用Kafka客户端向Kafka Topic中写入数据。
cd /opt/client
source bigdata_env
kinit Kafka用户 (集群未启用Kerberos认证(普通模式)无需执行kinit命令)
cd Kafka/kafka/bin
sh kafka-console-producer.sh --broker-list Kafka的Broker实例业务IP:Broker端口号 --topic Topic名称 --producer.config 客户端安装目录/Kafka/kafka/config/producer.properties
例如本示例中,Kafka Topic名称为“kinput”,执行以下命令:
sh kafka-console-producer.sh --broker-list 192.168.67.136:21007 --topic kinput --producer.config /opt/client/Kafka/kafka/config/producer.properties
向Topic中输入以下消息内容:{"pid": "3","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.451236414","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"} {"pid": "4","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.4512314","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}
输入完成后按回车发送消息。
更多Kafka客户端操作可参考管理Kafka主题中的消息。
- 参考5,通过客户端连接ClickHouse,然后执行查询命令查询ClickHouse表是否已写入数据。
例如当前示例中,ClickHouse表为“test1_all”。
select * from test1_all;