更新时间:2024-08-29 GMT+08:00
分享

创建FlinkServer作业写入数据至ClickHouse表

本章节适用于MRS 3.1.2及之后的版本。

操作场景

Flink通过对接ClickHouse的ClickHouseBalancer实例进行读写,有效避免ClickHouse流量分发问题。FlinkSQL与ClickHouse数据类型对应关系如下表所示。

MRS 3.2.0及以后版本,根据安全需求,FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。

表1 FlinkSQL与ClickHouse数据类型对应关系

FlinkSQL数据类型

ClickHouse数据类型

BOOLEAN

UInt8

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

FLOAT

Float32

DOUBLE

Float64

CHAR

String

VARCHAR

String

VARBINARY

FixedString

DATE

Date

TIMESTAMP

DateTime

DECIMAL

Decimal

前提条件

  • 集群中已安装ClickHouse、HDFS、Yarn、Flink和Kafka等服务。
  • 客户端已安装,例如安装路径为:/opt/client

创建作业步骤

  1. 使用root用户登录安装客户端的节点。
  2. 执行以下命令,切换到客户端安装目录。

    cd /opt/client

  3. 执行以下命令配置环境变量。

    source bigdata_env

  4. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有创建ClickHouse表的权限。如果当前集群未启用Kerberos认证,则无需执行此命令。

    kinit 组件业务用户

    例如,kinit clickhouseuser

  5. 连接ClickHouse客户端,可参考ClickHouse客户端使用实践命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。

    • 普通模式:

      clickhouse client --host ClickHouse的实例IP --user 登录名 --password '密码' --port ClickHouse的端口号 --multiline

    • 安全模式:

      clickhouse client --host ClickHouse的实例IP --user 登录名 --password '密码'--port ClickHouse的端口号 --secure --multiline

  6. 执行以下命令创建复制表和分布式表。

    1. 创建复制表“default.test1”。
      CREATE TABLE default.test1 on cluster default_cluster
      (
      `pid` Int8,
      `uid` UInt8,
      `Int_16` Int16,
      `Int_32` Int32,
      `Int_64` Int64,
      `String_x` String,
      `String_y` String,
      `float_32` Float32,
      `float_64` Float64,
      `Decimal_x` Decimal32(2),
      `Date_x` Date,
      `DateTime_x` DateTime
      )
      ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/test1','{replica}')
      PARTITION BY pid
      ORDER BY (pid, DateTime_x);
    2. 创建分布式表“test1_all”。
      CREATE TABLE test1_all ON CLUSTER default_cluster
      (
      `pid` Int8,
      `uid` UInt8,
      `Int_16` Int16,
      `Int_32` Int32,
      `Int_64` Int64,
      `String_x` String,
      `String_y` String,
      `float_32` Float32,
      `float_64` Float64,
      `Decimal_x` Decimal32(2),
      `Date_x` Date,
      `DateTime_x` DateTime
      )
      ENGINE = Distributed(default_cluster, default, test1, rand());

  7. 登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  8. 参考如何创建FlinkServer作业,新建Flink SQL作业,作业类型选择“流作业”。在作业开发界面进行如下作业配置,并启动作业。需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    • 如果当前MRS集群为安全模式,执行以下操作:
      create table kafkasource(
      `pid` TINYINT,
      `uid` BOOLEAN,
      `Int_16` SMALLINT,
      `Int_32` INTEGER,
      `Int_64` BIGINT,
      `String_x` CHAR,
      `String_y` VARCHAR(10),
      `float_32` FLOAT,
      `float_64` DOUBLE,
      `Decimal_x` DECIMAL(9,2),
      `Date_x` DATE,
      `DateTime_x` TIMESTAMP
      ) with(
        'connector' = 'kafka',
        'topic' = 'input',
        'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
        'properties.group.id' = 'group1',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json',
        'properties.sasl.kerberos.service.name' = 'kafka',
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.kerberos.domain.name' = 'hadoop.系统域名'
      );
      CREATE TABLE cksink (
      `pid` TINYINT,
      `uid` BOOLEAN,
      `Int_16` SMALLINT,
      `Int_32` INTEGER,
      `Int_64` BIGINT,
      `String_x` CHAR,
      `String_y` VARCHAR(10),
      `float_32` FLOAT,
      `float_64` DOUBLE,
      `Decimal_x` DECIMAL(9,2),
      `Date_x` DATE,
      `DateTime_x` TIMESTAMP
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP1:ClickHouseBalancer端口,ClickHouseBalancer实例IP2:ClickHouseBalancer端口/default?ssl=true&sslmode=none',
      'username' = 'ClickHouse用户,详见说明',
      'password' = 'ClickHouse用户密码,详见说明',
      'table-name' = 'test1_all',
      'driver' = 'com.clickhouse.jdbc.ClickHouseDriver',
      'sink.buffer-flush.max-rows' = '0',
      'sink.buffer-flush.interval' = '60s'
      );
      Insert into cksink 
      select
      * 
      from 
      kafkasource;
    • 如果当前MRS集群为普通模式,执行以下操作:
      create table kafkasource(
      `pid` TINYINT,
      `uid` BOOLEAN,
      `Int_16` SMALLINT,
      `Int_32` INTEGER,
      `Int_64` BIGINT,
      `String_x` CHAR,
      `String_y` VARCHAR(10),
      `float_32` FLOAT,
      `float_64` DOUBLE,
      `Decimal_x` DECIMAL(9,2),
      `Date_x` DATE,
      `DateTime_x` TIMESTAMP
      ) with(
        'connector' = 'kafka',
        'topic' = 'kinput',
        'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
        'properties.group.id' = 'kafka_test',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
      );
      CREATE TABLE cksink (
      `pid` TINYINT,
      `uid` BOOLEAN,
      `Int_16` SMALLINT,
      `Int_32` INTEGER,
      `Int_64` BIGINT,
      `String_x` CHAR,
      `String_y` VARCHAR(10),
      `float_32` FLOAT,
      `float_64` DOUBLE,
      `Decimal_x` DECIMAL(9,2),
      `Date_x` DATE,
      `DateTime_x` TIMESTAMP
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP1:ClickHouseBalancer端口,ClickHouseBalancer实例IP2:ClickHouseBalancer端口/default',
      'table-name' = 'test1_all',
      'username' = 'ClickHouse用户,详见说明',
      'password' = 'ClickHouse用户密码,详见说明',
      'driver' = 'com.clickhouse.jdbc.ClickHouseDriver',
      'sink.buffer-flush.max-rows' = '0',
      'sink.buffer-flush.interval' = '60s'
      );
      Insert into cksink 
      select
      * 
      from 
      kafkasource;
    • 创建的cksink表中username、password参数填写的用户为具有ClickHouse相应表权限的用户及密码,详见创建ClickHouse角色
    • Kafka端口号:
      • 集群的“认证模式”为“安全模式”时为“sasl.port”的值,默认为“21007”。
      • 集群的“认证模式”为“普通模式”时为“port”的值,默认为“9092”。

        如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • ClickHouseBalancer端口号要根据对接的ClickHouse集群选择:
      • 当ClickHouse所在集群为安全模式集群时,ClickHouseBalancer端口号默认为“21428”。
      • 当ClickHouse所在集群为普通模式集群时,ClickHouseBalancer端口号默认为“21426”。
    • url:可配置多个ClickHouseBalancer实例IP以避免ClickHouseBalancer实例单点故障。
    • 写入ClickHouse时会过滤Flink计算过程中产生的DELETE消息。
    • 攒批写参数:Flink会将数据先放入内存,到达触发条件时再flush到数据库表中。相关配置如下。

      sink.buffer-flush.max-rows:攒批写ClickHouse的行数,默认100。

      sink.buffer-flush.interval:攒批写入的间隔时间,默认1s。

      这两个条件只要有一个满足,就会触发一次sink,即到达触发条件时再flush到数据库表中。

      • 情况一:60s sink一次

        'sink.buffer-flush.max-rows' = '0',

        'sink.buffer-flush.interval' = '60s'

      • 情况二:100条 sink一次

        'sink.buffer-flush.max-rows' = '100',

        'sink.buffer-flush.interval' = '0s'

      • 情况三:数据不sink

        'sink.buffer-flush.max-rows' = '0',

        'sink.buffer-flush.interval' = '0s'

  9. 查看作业管理界面,作业状态为“运行中”。
  10. 参考管理Kafka Topic中的消息,向kafka中写入数据。

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为kinput:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic kinput --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:
    {"pid": "3","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.451236414","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}
    {"pid": "4","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.4512314","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}

    输入完成后按回车发送消息。

  11. 连接ClickHouse查询表数据。

    clickhouse client --host ClickHouse的实例IP --user 登录名 --password '密码'--port ClickHouse的端口号 --secure --multiline

    执行查询命令查询ClickHouse表是否已写入数据。例如,当前ClickHouse表为test1_all。

    select * from test1_all;

相关文档