FlinkServer对接ClickHouse
操作场景
Flink通过对接ClickHouse的ClickHouseBalancer实例进行读写,有效避免ClickHouse流量分发问题。
前提条件
- 集群中已安装ClickHouse、HDFS、Yarn、Flink和HBase等服务。
- 客户端已安装,例如安装路径为:/opt/Bigdata/client。
FlinkSQL与ClickHouse数据类型对应关系
FlinkSQL数据类型 |
ClickHouse数据类型 |
---|---|
BOOLEAN |
UInt8 |
TINYINT |
Int8 |
SMALLINT |
Int16 |
INTEGER |
Int32 |
BIGINT |
Int64 |
FLOAT |
Float32 |
DOUBLE |
Float64 |
CHAR |
String |
VARCHAR |
String |
VARBINARY |
FixedString |
DATE |
Date |
TIMESTAMP |
DateTime |
DECIMAL |
Decimal |
操作步骤
- 使用root用户登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录。
cd /opt/Bigdata/client
- 执行以下命令配置环境变量。
source bigdata_env
- 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有创建ClickHouse表的权限。如果当前集群未启用Kerberos认证,则无需执行此命令。
kinit 组件业务用户
例如,kinit clickhouseuser。
- 连接ClickHouse客户端,可参考从零开始使用ClickHouse。
- 执行以下命令创建复制表和分布式表。
- 创建复制表“default.test1”。
CREATE TABLE default.test1 on cluster default_cluster ( `pid` Int8, `uid` UInt8, `Int_16` Int16, `Int_32` Int32, `Int_64` Int64, `String_x` String, `String_y` String, `float_32` Float32, `float_64` Float64, `Decimal_x` Decimal32(2), `Date_x` Date, `DateTime_x` DateTime ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/test1','{replica}') PARTITION BY pid ORDER BY (pid, DateTime_x);
- 创建分布式表“test1_all”。
CREATE TABLE test1_all ON CLUSTER default_cluster ( `pid` Int8, `uid` UInt8, `Int_16` Int16, `Int_32` Int32, `Int_64` Int64, `String_x` String, `String_y` String, `float_32` Float32, `float_64` Float64, `Decimal_x` Decimal32(2), `Date_x` Date, `DateTime_x` DateTime ) ENGINE = Distributed(default_cluster, default, test1, rand());
- 创建复制表“default.test1”。
- 登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL作业,作业类型选择“流作业”。在作业开发界面进行如下作业配置,并启动作业。需勾选“运行参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
- 如果当前MRS集群为安全模式,执行以下操作:
create table kafkasource( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) with( 'connector' = 'kafka', 'topic' = 'input', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'group1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE cksink ( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP:21422/default?ssl=true&sslmode=none', 'username' = 'ClickHouse用户,详见说明', 'password' = 'ClickHouse用户密码,详见说明', 'table-name' = 'test1_all', 'driver' = 'ru.yandex.clickhouse.ClickHouseDriver', 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' ); Insert into cksink select * from kafkasource;
- 如果当前MRS集群为普通模式,执行以下操作:
create table kafkasource( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) with( 'connector' = 'kafka', 'topic' = 'kinput', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'kafka_test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE cksink ( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP:21425/default', 'table-name' = 'test1_all', 'driver' = 'ru.yandex.clickhouse.ClickHouseDriver', 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' ); Insert into cksink select * from kafkasource;
- MRS集群安全模式下,创建的cksink表中username、password参数填写的用户为具有ClickHouse相应表权限的用户及密码,详见ClickHouse用户及权限管理。
- Kafka端口号:
- 21422:ClickHouseBalancer实例IP的https端口。
- 21425:ClickHouseBalancer实例IP的http端口。
- 其他jdbc connector参数请参考Flink官网:http://flink.apache.org/
- 攒批写参数:Flink会将数据先放入内存,到达触发条件时再flush到数据库表中。相关配置如下。
sink.buffer-flush.max-rows:攒批写ClickHouse的行数,默认100。
sink.buffer-flush.interval:攒批写入的间隔时间,默认1s。
这两个条件只要有一个满足,就会触发一次sink,即到达触发条件时再flush到数据库表中。
- 如果当前MRS集群为安全模式,执行以下操作:
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka主题中的消息,向kafka中写入数据。
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic主题名称 --producer.config ../config/producer.properties
例如本示例使用主题名称为kinput:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic kinput --producer.config ../config/producer.properties
输入消息内容:{"pid": "3","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.451236414","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}, {"pid": "4","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.4512314","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}
输入完成后按回车发送消息。
- 连接ClickHouse查询表数据。
clickhouse client --host ClickHouse的实例IP --user 登录名 --password '密码'--port ClickHouse的端口号 --secure --multiline
执行查询命令查询ClickHouse表是否已写入数据。例如,当前ClickHouse表为test1_all。
select * from test1_all;