更新时间:2024-07-24 GMT+08:00

同步Kafka数据至ClickHouse

您可以通过创建Kafka引擎表将Kafka数据自动同步至ClickHouse集群,具体操作详见本章节描述。

前提条件

  • 已创建Kafka集群。已安装Kafka客户端,详细可以参考安装客户端
  • 已创建ClickHouse集群,并且ClickHouse集群和Kafka集群在同一VPC下,网络可以互通,并安装ClickHouse客户端。

约束限制

当前ClickHouse不支持和开启安全模式的Kafka集群进行对接。

Kafka引擎表使用语法说明

  • 语法
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'host1:port1,host2:port2',
        kafka_topic_list = 'topic1,topic2,...',
        kafka_group_name = 'group_name',
        kafka_format = 'data_format';
        [kafka_row_delimiter = 'delimiter_symbol',]
        [kafka_schema = '',]
        [kafka_num_consumers = N]
  • 参数说明
    表1 Kafka引擎表参数说明

    参数名

    是否必选

    参数说明

    kafka_broker_list

    Kafka集群broker实例的IP和端口列表。例如:kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092

    说明:

    启用Kerberos认证下,使用21005端口需要“allow.everyone.if.no.acl.found”参数值设置为true; 若不设置此参数,操作会报错。

    Kafka集群broker实例IP获取方法如下:

    • MRS 3.x及后续版本,登录FusionInsight Manager,然后选择“集群 > 待操作的集群名称 > 服务 > Kafka”。单击“实例”,查看Kafka角色实例的IP地址。

    kafka_topic_list

    Kafka的topic列表。

    kafka_group_name

    Kafka的Consumer Group名称,可以自己指定。

    kafka_format

    Kafka消息体格式。例如JSONEachRow、CSV、XML等。

    kafka_row_delimiter

    每个消息体(记录)之间的分隔符。

    kafka_schema

    如果解析格式需要一个schema时,此参数必填。

    kafka_num_consumers

    单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过topic中分区的数量,因为每个分区只能分配一个消费者。

Kafka数据同步至ClickHouse操作示例

  1. 参考Kafka客户端使用实践,切换到Kafka客户端安装目录。

    1. 以Kafka客户端安装用户,登录Kafka安装客户端的节点。
    2. 执行以下命令,切换到客户端安装目录。

      cd /opt/client

    3. 执行以下命令配置环境变量。

      source bigdata_env

    4. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。

      如果是MRS 3.1.0版本集群,则需要先执行:export CLICKHOUSE_SECURITY_ENABLED=true

      kinit 组件业务用户

  2. 执行以下命令,创建Kafka的Topic。详细的命令使用可以参考创建Kafka Topic

    kafka-topics.sh --topic kafkacktest2 --create --zookeeper ZooKeeper角色实例IP:ZooKeeper侦听客户端连接的端口/kafka --partitions 2 --replication-factor 1

    • --topic参数值为要创建的Topic名称,本示例创建的名称为kafkacktest2
    • --zookeeper:ZooKeeper角色实例所在节点IP地址,填写三个角色实例其中任意一个的IP地址即可ZooKeeper角色实例所在节点IP获取参考如下。
      • MRS 3.x之前版本,单击集群名称,登录集群详情页面,选择“组件管理 > ZooKeeper > 实例”。查看ZooKeeper角色实例的IP地址。
      • MRS 3.x及后续版本,登录FusionInsight Manager,具体请参见访问FusionInsight Manager(MRS 3.x及之后版本)。然后选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 实例”。查看ZooKeeper角色实例的IP地址。
    • --partitions主题分区数和--replication-factor主题备份个数不能大于Kafka角色实例数量。
    • ZooKeeper侦听客户端连接的端口获取方式:登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。默认为24002。

  3. 参考使用ClickHouse客户端登录ClickHouse客户端。

    1. 执行以下命令,切换到客户端安装目录。

      cd /opt/client

    2. 执行以下命令配置环境变量。

      source bigdata_env

    3. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有创建ClickHouse表的权限,具体请参见ClickHouse用户及权限管理章节,为用户绑定对应角色。如果当前集群未启用Kerberos认证,则无需执行本步骤。

      kinit 组件业务用户

      例如,kinit clickhouseuser。

    4. 执行以下命令连接到要导入数据的ClickHouse实例节点。

      clickhouse client --host ClickHouse的实例IP --user 登录名 --password --port ClickHouse的端口号 --database 数据库名 --multiline

      输入用户密码

  4. 参考Kafka引擎表使用语法说明,在ClickHouse中创建Kafka引擎表。例如,如下建表语句在default数据库下,创建表名为kafka_src_tbl3,Topic名为kafkacktest2、消息格式为JSONEachRow的Kafka引擎表。

    create table kafka_src_tbl3 on cluster default_cluster 
    (id UInt32, age UInt32, msg String)  
    ENGINE=Kafka() 
    SETTINGS 
     kafka_broker_list='kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092',
     kafka_topic_list='kafkacktest2',
     kafka_group_name='cg12',
     kafka_format='JSONEachRow';

  5. 创建ClickHouse本地复制表。例如,如下创建表名为kafka_dest_tbl3的ReplicatedMergeTree表。

    create table kafka_dest_tbl3 on cluster default_cluster 
    ( id UInt32, age UInt32, msg String )
    engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/kafka_dest_tbl3', '{replica}')
    partition by age 
    order by id;

  6. 创建MATERIALIZED VIEW,该视图会在后台转换Kafka引擎中的数据并将其放入创建的ClickHouse表中。

    create materialized view consumer3 on cluster default_cluster to kafka_dest_tbl3 as select * from kafka_src_tbl3;

  7. 再次执行1,进入Kafka客户端安装目录。
  8. 执行以下命令,在Kafka的Topic中产生消息。例如,如下命令向2中创建的Topic发送消息。

    kafka-console-producer.sh --broker-list kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092 --topic kafkacktest2
    >{"id":31, "age":30, "msg":"31 years old"}
    >{"id":32, "age":30, "msg":"31 years old"}
    >{"id":33, "age":30, "msg":"31 years old"}
    >{"id":35, "age":30, "msg":"31 years old"}

  9. 使用ClickHouse客户端登录3中ClickHouse实例节点,查询ClickHouse表数据。例如,查询kafka_dest_tbl3本地复制表,Kafka消息中的数据已经同步到该表。

    select * from kafka_dest_tbl3;