更新时间:2024-11-29 GMT+08:00

对接普通模式Kafka

操作场景

本章节主要介绍ClickHouse连接普通模式的Kafka,消费Kafka的数据。

前提条件

  • 已创建Kafka集群,且为普通模式。
  • 已创建ClickHouse集群,并且ClickHouse集群和Kafka集群网络可以互通,并安装ClickHouse客户端。

操作步骤

  1. 登录ClickHouse服务所在集群的Manager页面,选择“集群 > 服务 > ClickHouse > 配置 > 全部配置 > ClickHouseServer(角色) > 引擎”,修改如下参数:

    参数

    参数说明

    kafka.security_protocol

    参数值:plaintext

    kafka_auth_mode

    ClickHouse连接Kafka的认证方式,参数值选择NoAuth。

  2. 单击“保存”,在弹窗页面中单击“确定”,保存配置。单击“实例”,勾选ClickHouseServer实例,选择“更多 > 滚动重启实例”,重启ClickHouseServer实例。
  3. 参考使用Kafka客户端,登录到Kafka客户端安装目录。

    1. 以Kafka客户端安装用户,登录Kafka安装客户端的节点。
    2. 执行以下命令,切换到客户端安装目录。

      cd /opt/client

    3. 执行以下命令配置环境变量。

      source bigdata_env

  4. 执行以下命令,创建Kafka的Topic。详细的命令使用可以参考管理Kafka主题

    kafka-topics.sh --topic topic1 --create --zookeeper ZooKeeper角色实例IP:ZooKeeper侦听客户端连接的端口/kafka --partitions 2 --replication-factor 1

    • --topic参数值为要创建的Topic名称,本示例创建的名称为topic1
    • --zookeeper:ZooKeeper角色实例所在节点IP地址,填写三个角色实例其中任意一个的IP地址即可ZooKeeper角色实例所在节点IP获取参考如下:

      登录FusionInsight Manager页面,选择“集群 > 服务 > ZooKeeper > 实例”,查看ZooKeeper角色实例的IP地址。

    • --partitions主题分区数和--replication-factor主题备份个数不能大于Kafka角色实例数量。
    • ZooKeeper侦听客户端连接的端口获取方式:登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值,默认为24002。

  5. 登录ClickHouse客户端节点,连接ClickHouse服务端,具体请参考从零开始使用ClickHouse章节。
  6. 创建Kafka的表引擎,示例如下:

    CREATE TABLE queue1 (
    key String,
    value String,
    event_date DateTime
    ) ENGINE = Kafka()
    SETTINGS kafka_broker_list = 'kafka_ip1:21005,kafka_ip2:21005,kafka_ip3:21005',
    kafka_topic_list = 'topic1',
    kafka_group_name = 'group2',
    kafka_format = 'CSV',
    kafka_row_delimiter = '\n',
    kafka_handle_error_mode='stream';

    相关参数说明如下表:

    参数

    参数说明

    kafka_broker_list

    Kafka集群Broker实例的IP和端口列表。例如:kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092。

    Kafka集群broker实例IP获取方法如下:

    登录FusionInsight Manager页面,选择“集群 > 服务 > Kafka”。单击“实例”,查看Kafka角色实例的IP地址。

    kafka_topic_list

    消费Kafka的Topic。

    kafka_group_name

    Kafka消费组。

    kafka_format

    消费数据的格式化类型,JSONEachRow表示每行一条数据的json格式,CSV格式表示逗号分隔的一行数据。

    kafka_row_delimiter

    每个消息体(记录)之间的分隔符。

    kafka_handle_error_mode

    设置为stream,会把每条消息处理的异常打印出来。需要创建视图,通过视图查询异常数据的具体处理异常。

    创建视图语句,示例如下:

    CREATE MATERIALIZED VIEW default.kafka_errors2
    (
    `topic` String,
    `key` String,
    `partition` Int64,
    `offset` Int64,
    `timestamp` Date,
    `timestamp_ms` Int64,
    `raw` String,
    `error` String
    )
    ENGINE = MergeTree
    ORDER BY (topic, partition, offset)
    SETTINGS index_granularity = 8192 AS
    SELECT
    _topic AS topic,
    _key  AS key,
    _partition AS partition,
    _offset AS offset,
    _timestamp AS timestamp,
    _timestamp_ms AS timestamp_ms,
    _raw_message AS raw,
    _error AS error
    FROM default.queue1;

    查询视图,示例如下:

    host1 :) select * from kafka_errors2;  SELECT * FROM kafka_errors2  Query id: bf4d788f-bcb9-44f5-95d0-a6c83c591ddb  ┌─topic──┬─key─┬─partition─┬─offset─┬──timestamp─┬─timestamp_ms─┬─raw─┬─error────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ topic1 │     │         1 │      8 │ 2023-06-20 │   1687252213 │ 456 │ Cannot parse date: value is too short: (at row 1) Buffer has gone, cannot extract information about what has been parsed. │ └────────┴─────┴───────────┴────────┴────────────┴──────────────┴─────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘  1 rows in set. Elapsed: 0.003 sec.   host1 :)

    kafka_skip_broken_messages

    (可选)表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监测数据。

    kafka_num_consumers

    (可选)单个Kafka Engine的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数。

    其他配置可参考https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka

  7. 通过客户端连接ClickHouse创建本地表,示例如下:

    CREATE TABLE daily1(
    key String,
    value String,
    event_date DateTime
    )ENGINE = MergeTree()
    ORDER BY key;

  8. 通过客户端连接ClickHouse创建物化视图,示例如下:

    CREATE MATERIALIZED VIEW default.consumer1 TO default.daily1 (
    `event_date` DateTime,
    `key` String,
    `value` String
    ) AS
    SELECT
    event_date,
    key,
    value
    FROM default.queue1;

  9. 再次执行3,进入Kafka客户端安装目录。
  10. 执行以下命令,在Kafka的Topic中产生消息。例如,如下命令向4中创建的Topic发送消息。

    kafka-console-producer.sh --broker-list kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092 --topic topic1
    >a1,b1,'2020-08-01 10:00:00'
    >a2,b2,'2020-08-02 10:00:00'
    >a3,b3,'2020-08-02 10:00:00'
    >a4,b4,'2023-09-02 10:00:00'

  11. 查询消费到的Kafka数据,查询上述的物化视图,示例如下:

    select * from daily1;