创建FlinkServer作业写入数据至ClickHouse表
本章节适用于MRS 3.1.2及之后的版本。
操作场景
Flink通过对接ClickHouse的ClickHouseBalancer实例进行读写,有效避免ClickHouse流量分发问题。FlinkSQL与ClickHouse数据类型对应关系如下表所示。
MRS 3.2.0及以后版本,根据安全需求,FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。
FlinkSQL数据类型 |
ClickHouse数据类型 |
---|---|
BOOLEAN |
UInt8 |
TINYINT |
Int8 |
SMALLINT |
Int16 |
INTEGER |
Int32 |
BIGINT |
Int64 |
FLOAT |
Float32 |
DOUBLE |
Float64 |
CHAR |
String |
VARCHAR |
String |
VARBINARY |
FixedString |
DATE |
Date |
TIMESTAMP |
DateTime |
DECIMAL |
Decimal |
前提条件
- 集群中已安装ClickHouse、HDFS、Yarn、Flink和Kafka等服务。
- 客户端已安装,例如安装路径为:/opt/client。
创建作业步骤
- 使用root用户登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录。
cd /opt/client
- 执行以下命令配置环境变量。
source bigdata_env
- 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有创建ClickHouse表的权限。如果当前集群未启用Kerberos认证,则无需执行此命令。
kinit 组件业务用户
例如,kinit clickhouseuser。
- 连接ClickHouse客户端,可参考ClickHouse客户端使用实践。命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。
- 执行以下命令创建复制表和分布式表。
- 创建复制表“default.test1”。
CREATE TABLE default.test1 on cluster default_cluster ( `pid` Int8, `uid` UInt8, `Int_16` Int16, `Int_32` Int32, `Int_64` Int64, `String_x` String, `String_y` String, `float_32` Float32, `float_64` Float64, `Decimal_x` Decimal32(2), `Date_x` Date, `DateTime_x` DateTime ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/test1','{replica}') PARTITION BY pid ORDER BY (pid, DateTime_x);
- 创建分布式表“test1_all”。
CREATE TABLE test1_all ON CLUSTER default_cluster ( `pid` Int8, `uid` UInt8, `Int_16` Int16, `Int_32` Int32, `Int_64` Int64, `String_x` String, `String_y` String, `float_32` Float32, `float_64` Float64, `Decimal_x` Decimal32(2), `Date_x` Date, `DateTime_x` DateTime ) ENGINE = Distributed(default_cluster, default, test1, rand());
- 创建复制表“default.test1”。
- 登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考如何创建FlinkServer作业,新建Flink SQL作业,作业类型选择“流作业”。在作业开发界面进行如下作业配置,并启动作业。需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
- 如果当前MRS集群为安全模式,执行以下操作:
create table kafkasource( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) with( 'connector' = 'kafka', 'topic' = 'input', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'group1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE cksink ( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP1:ClickHouseBalancer端口,ClickHouseBalancer实例IP2:ClickHouseBalancer端口/default?ssl=true&sslmode=none', 'username' = 'ClickHouse用户,详见说明', 'password' = 'ClickHouse用户密码,详见说明', 'table-name' = 'test1_all', 'driver' = 'com.clickhouse.jdbc.ClickHouseDriver', 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' ); Insert into cksink select * from kafkasource;
- 如果当前MRS集群为普通模式,执行以下操作:
create table kafkasource( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) with( 'connector' = 'kafka', 'topic' = 'kinput', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'kafka_test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE cksink ( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP1:ClickHouseBalancer端口,ClickHouseBalancer实例IP2:ClickHouseBalancer端口/default', 'table-name' = 'test1_all', 'username' = 'ClickHouse用户,详见说明', 'password' = 'ClickHouse用户密码,详见说明', 'driver' = 'com.clickhouse.jdbc.ClickHouseDriver', 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' ); Insert into cksink select * from kafkasource;
- 创建的cksink表中username、password参数填写的用户为具有ClickHouse相应表权限的用户及密码,详见创建ClickHouse角色。
- Kafka端口号:
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- ClickHouseBalancer端口号要根据对接的ClickHouse集群选择:
- 当ClickHouse所在集群为安全模式集群时,ClickHouseBalancer端口号默认为“21426”。
- 当ClickHouse所在集群为普通模式集群时,ClickHouseBalancer端口号默认为“21425”。
- url:可配置多个ClickHouseBalancer实例IP以避免ClickHouseBalancer实例单点故障。
- 写入ClickHouse时会过滤Flink计算过程中产生的DELETE消息。
- 攒批写参数:Flink会将数据先放入内存,到达触发条件时再flush到数据库表中。相关配置如下。
sink.buffer-flush.max-rows:攒批写ClickHouse的行数,默认100。
sink.buffer-flush.interval:攒批写入的间隔时间,默认1s。
这两个条件只要有一个满足,就会触发一次sink,即到达触发条件时再flush到数据库表中。
- 如果当前MRS集群为安全模式,执行以下操作:
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka Topic中的消息,向kafka中写入数据。
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为kinput:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic kinput --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:{"pid": "3","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.451236414","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"} {"pid": "4","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.4512314","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}
输入完成后按回车发送消息。
- 连接ClickHouse查询表数据。
clickhouse client --host ClickHouse的实例IP --user 登录名 --password '密码'--port ClickHouse的端口号 --secure --multiline
执行查询命令查询ClickHouse表是否已写入数据。例如,当前ClickHouse表为test1_all。
select * from test1_all;