文档首页/ MapReduce服务 MRS/ 组件操作指南(LTS版)/ 使用Flink/ 创建FlinkServer作业/ FlinkServer作业对接ClickHouse(ClickHouse Connector)
更新时间:2025-12-10 GMT+08:00
分享

FlinkServer作业对接ClickHouse(ClickHouse Connector)

本章节适用于MRS 3.6.0-LTS及之后的版本。

操作场景

Flink通过对接ClickHouse的ClickHouseBalancer实例进行读写,有效避免ClickHouse流量分发问题。ClickHouse可以作为Source表、Sink表和维表。

前提条件

  • 集群中已安装ClickHouse、HDFS、Yarn、Flink和Kafka等服务,ClickHouse服务中的逻辑集群已存在并运行正常。
  • 已安装集群客户端,例如安装路径为:/opt/client
  • 需提前在FusionInsight Manager中创建具有创建ClickHouse数据表、创建FlinkServer作业、Kafka操作权限的用户。

FlinkSQL与ClickHouse数据类型对应关系说明

FlinkSQL数据类型

ClickHouse数据类型

CHAR

String

VARCHAR

String / IP / UUID

STRING

String / Enum

BOOLEAN

UInt8

BYTES

FixedString

DECIMAL

Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256

TINYINT

Int8

SMALLINT

Int16 / UInt8

INTEGER

Int32 / UInt16 / Interval

BIGINT

Int64 / UInt32

FLOAT

Float32

DOUBLE

Float64

DATE

Date

TIME

DateTime

TIMESTAMP

DateTime

TIMESTAMP_LTZ

DateTime

INTERVAL_YEAR_MONTH

Int32

INTERVAL_DAY_TIME

Int64

ARRAY

Array

MAP

Map

ROW

不支持

MULTISET

不支持

RAW

不支持

操作步骤

  1. 使用root用户登录安装客户端的节点。
  2. 执行以下命令,切换到客户端安装目录。

    cd /opt/client

  3. 执行以下命令配置环境变量。

    source bigdata_env

  4. 如果当前集群已启用Kerberos认证(安全模式),执行以下命令认证用户,用户需要具有创建ClickHouse表的权限。如果当前集群未启用Kerberos认证(普通模式),则无需执行此命令。

    kinit 组件业务用户

    例如,kinit clickhouseuser

  5. 执行以下命令连接ClickHouse服务端。

    • 集群未启用Kerberos认证(普通模式)

      clickhouse client --host 待连接的ClickHouseServer实例IP地址 --user 登录名 --password 密码 --port ClickHouse的端口号 --multiline

      命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。

    • 集群已启用Kerberos认证(安全模式)

      clickhouse client --host 待连接的ClickHouse的实例IP地址 --port ClickHouse的端口号 --secure --multiline

    更多ClickHouse客户端操作可参考ClickHouse客户端使用实践

  6. 执行以下命令创建一个复制表和分布式表。

    1. 例如创建的复制表名称为“default.test1”,所连接的ClickHouse逻辑集群名称为“default_cluster”。

      CREATE TABLE default.test1 on cluster default_cluster

      (

      `pid` Int8,

      `uid` UInt8,

      `Int_16` Int16,

      `Int_32` Int32,

      `Int_64` Int64,

      `String_x` String,

      `String_y` String,

      `float_32` Float32,

      `float_64` Float64,

      `Decimal_x` Decimal32(2),

      `Date_x` Date,

      `DateTime_x` DateTime

      )

      ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/{数据库名}/test1','{replica}')

      PARTITION BY pid

      ORDER BY (pid, DateTime_x);

    2. 创建分布式表“test1_all”。

      CREATE TABLE test1_all on cluster default_cluster

      (

      `pid` Int8,

      `uid` UInt8,

      `Int_16` Int16,

      `Int_32` Int32,

      `Int_64` Int64,

      `String_x` String,

      `String_y` String,

      `float_32` Float32,

      `float_64` Float64,

      `Decimal_x` Decimal32(2),

      `Date_x` Date,

      `DateTime_x` DateTime

      )

      ENGINE = Distributed(default_cluster, default, test1, rand());

  7. 使用具有FlinkServer操作权限的用户,登录FusionInsight Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,进入FlinkServer的WebUI界面。

    更多FlinkServer权限说明信息可参考Flink用户权限说明

  8. 在“作业管理”界面单击“新建作业”,新建一个Flink SQL作业并提交。

    作业类型选择“流作业”,在作业开发界面参考以下示例语句进行配置,然后提交作业。在作业“基础参数”中,需勾选“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”使用默认值即可。

    更多FlinkServer作业参数详细说明可参考创建FlinkServer作业

    • 如果当前MRS集群已启用Kerberos认证(安全模式),示例语句如下:
      create table kafkasource(
      `pid` TINYINT,
      `uid` BOOLEAN,
      `Int_16` SMALLINT,
      `Int_32` INTEGER,
      `Int_64` BIGINT,
      `String_x` CHAR,
      `String_y` VARCHAR(10),
      `float_32` FLOAT,
      `float_64` DOUBLE,
      `Decimal_x` DECIMAL(9,2),
      `Date_x` DATE,
      `DateTime_x` TIMESTAMP
      ) with(
        'connector' = 'kafka',
        'topic' = 'input',
        'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP1:Kafka端口号,Kafka的Broker实例业务IP2:Kafka端口号,Kafka的Broker实例业务IP3:Kafka端口号',
        'properties.group.id' = 'group1',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json',
        'properties.sasl.kerberos.service.name' = 'kafka',
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.kerberos.domain.name' = 'hadoop.系统域名'
      );
      CREATE TABLE cksink (
      `pid` TINYINT,
      `uid` BOOLEAN,
      `Int_16` SMALLINT,
      `Int_32` INTEGER,
      `Int_64` BIGINT,
      `String_x` CHAR,
      `String_y` VARCHAR(10),
      `float_32` FLOAT,
      `float_64` DOUBLE,
      `Decimal_x` DECIMAL(9,2),
      `Date_x` DATE,
      `DateTime_x` TIMESTAMP
      ) WITH (
      'connector' = 'clickhouse',
      'url' = 'jdbc:clickhouse://ClickHouseBalancer实例业务IP1:ClickHouseBalancer端口,ClickHouseBalancer实例业务IP2:ClickHouseBalancer端口/default?ssl=true&sslmode=none',
      'username' = 'ClickHouse用户',
      'password' = 'ClickHouse用户密码',
      'table-name' = 'test1_all',
      'sink.batch-size' = '0',
      'sink.flush-interval' = '60s'
      );
      Insert into cksink 
      select
      * 
      from 
      kafkasource;
    • 如果当前MRS集群未启用Kerberos认证(普通模式),示例语句如下:
      create table kafkasource(
      `pid` TINYINT,
      `uid` BOOLEAN,
      `Int_16` SMALLINT,
      `Int_32` INTEGER,
      `Int_64` BIGINT,
      `String_x` CHAR,
      `String_y` VARCHAR(10),
      `float_32` FLOAT,
      `float_64` DOUBLE,
      `Decimal_x` DECIMAL(9,2),
      `Date_x` DATE,
      `DateTime_x` TIMESTAMP
      ) with(
        'connector' = 'kafka',
        'topic' = 'kinput',
        'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP1:Broker端口号,Kafka的Broker实例业务IP2:Kafka端口号,Kafka的Broker实例业务IP3:Kafka端口号',
        'properties.group.id' = 'kafka_test',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
      );
      CREATE TABLE cksink (
      `pid` TINYINT,
      `uid` BOOLEAN,
      `Int_16` SMALLINT,
      `Int_32` INTEGER,
      `Int_64` BIGINT,
      `String_x` CHAR,
      `String_y` VARCHAR(10),
      `float_32` FLOAT,
      `float_64` DOUBLE,
      `Decimal_x` DECIMAL(9,2),
      `Date_x` DATE,
      `DateTime_x` TIMESTAMP
      ) WITH (
      'connector' = 'clickhouse',
      'url' = 'jdbc:clickhouse://ClickHouseBalancer实例业务IP1:ClickHouseBalancer端口,ClickHouseBalancer实例业务IP2:ClickHouseBalancer端口/default',
      'table-name' = 'test1_all',
      'username' = 'ClickHouse用户',
      'password' = 'ClickHouse用户密码',
      'sink.batch-size' = '0',
      'sink.flush-interval' = '60s'
      );
      Insert into cksink 
      select
      * 
      from 
      kafkasource;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • ClickHouseBalancer实例IP地址及端口号说明:

      服务的实例IP地址可通过登录FusionInsight Manager后,选择“集群 > 服务 > ClickHouse > 实例”,在实例列表页面中查询。

      ClickHouseBalancer端口号要根据对接的ClickHouse逻辑集群选择:登录FusionInsight Manager后,选择“集群 > 服务 > ClickHouse > 逻辑集群”,查看“HTTP Balancer端口号”。

      建议配置多个ClickHouseBalancer实例IP以避免ClickHouseBalancer实例单点故障。

    • 创建的cksink表中username、password参数需填写具有ClickHouse相应表操作权限的用户及密码,可参考创建具有ClickHouse权限的用户
    • 攒批写参数:Flink会将数据先放入内存,到达触发条件时再flush到数据库表中。相关配置如下。

      sink.batch-size:攒批写ClickHouse的行数,默认1000。

      sink.flush-interval:攒批写入的间隔时间,默认1s。

      这两个条件只要有一个满足,就会触发一次sink,即到达触发条件时再flush到数据库表中。

      • 情况一:60s sink一次

        'sink.batch-size' = '0',

        'sink.flush-interval' = '60s'

      • 情况二:1000条 sink一次

        'sink.batch-size' = '1000',

        'sink.flush-interval' = '0s'

      • 情况三:数据不sink

        'sink.batch-size' = '0',

        'sink.flush-interval' = '0s'

  9. 作业提交完成后,查看Flink作业管理界面,等待作业状态变为“运行中”。
  10. 使用Kafka客户端向Kafka Topic中写入数据。

    cd /opt/client

    source bigdata_env

    kinit Kafka用户集群未启用Kerberos认证(普通模式)无需执行kinit命令)

    cd Kafka/kafka/bin

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例业务IP:Broker端口号 --topic Topic名称 --producer.config 客户端安装目录/Kafka/kafka/config/producer.properties

    例如本示例中,Kafka Topic名称为“kinput”,执行以下命令:

    sh kafka-console-producer.sh --broker-list 192.168.67.136:21007 --topic kinput --producer.config /opt/client/Kafka/kafka/config/producer.properties

    向Topic中输入以下消息内容:
    {"pid": "3","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.451236414","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}
    {"pid": "4","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.4512314","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}

    输入完成后按回车发送消息。

    更多Kafka客户端操作可参考管理Kafka Topic中的消息

  11. 参考5,通过客户端连接ClickHouse,然后执行查询命令查询ClickHouse表是否已写入数据。

    例如当前示例中,ClickHouse表为“test1_all”。

    select * from test1_all;

相关表参数配置说明

表1 ClickHouse作为Sink表时配置参数

参数名称

参数默认值

是否必须配置

参数描述

sink.batch-size

1000

攒批写ClickHouse的行数,默认1000。

sink.flush-interval

1s

攒批写入的间隔时间,默认1s。

sink.max-retries

3

当数据写入失败时的重试次数。

sink.ignore-delete

true

是否忽略delete消息:

  • true:忽略delete消息。
  • false:不忽略delete消息。

sink.parallelism

(none) Integer

自定义Sink并行度。

表2 ClickHouse作为Source表时配置参数

参数名称

参数默认值

是否必须配置

参数描述

scan.partition.column

(none) String

用来分区读取数据的列名,选择数值类型的分区字段。

scan.partition.num

(none) Integer

划分成几个分区。

scan.partition.lower-bound

(none) Long

第一个分区的最小值。

scan.partition.upper-bound

(none) Long

最后一个分区的最大值。

表3 ClickHouse作为维表时配置参数

参数名称

参数默认值

是否必须配置

参数描述

lookup.cache

NONE

维表的缓存策略。目前支持NONE(不缓存)和PARTIAL(查找Clickhouse时缓存)。

lookup.partial-cache.max-rows

(none) Integer

维表缓存的最大行数,若超过该值,则最早的行记录将会过期。使用该配置时“lookup.cache”必须设置为“PARTIAL”。

lookup.partial-cache.expire-after-write

(none)Duration

在记录写入缓存后该记录的最大保留时间。使用该配置时“lookup.cache”必须设置为“PARTIAL”。

lookup.partial-cache.expire-after-access

(none)Duration

缓存中的记录最长保留时间。使用该配置时“lookup.cache”必须设置为“PARTIAL”。

lookup.partial-cache.cache-missing-key

true

在HTTP服务中未关联到数据时,是否缓存空记录,默认为true。使用该配置时“lookup.cache”必须设置为“PARTIAL”。

lookup.max-retries

3

查询失败的最大重试次数。

相关文档