更新时间:2024-11-29 GMT+08:00

通过用户密码对接Kafka

操作场景

本章节主要介绍ClickHouse通过用户名和密码的方式连接Kafka,消费Kafka的数据。

前提条件

  • 已创建Kafka集群,且为安全模式。
  • 已安装集群客户端。
  • 如果ClickHouse与Kafka不在同一个集群需要建立跨集群互信。

操作步骤

  1. 登录Kafka服务所在Manager页面,选择“系统 > 权限 > 用户 > 添加用户”,创建一个具有Kafka权限的人机用户,例如创建人机用户ck_user1,首次使用需要修改初始密码。Kafka用户权限介绍请参考管理Kafka用户权限
  2. 选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索参数“sasl.enabled.mechanisms”,修改参数值为“GSSAPI,PLAIN”,单击“保存”。

  3. 登录ClickHouse服务所在Manager页面,选择“集群 > 服务 > ClickHouse > 配置 > 全部配置 > ClickHouseServer(角色) > 引擎”,修改如下参数,配置连接Kafka的用户名和密码。

    参数

    参数说明

    kafka.sasl_mechanisms

    指定连接Kafka使用的SASL认证机制,参数值为PLAIN。

    kafka.sasl_password

    连接Kafka用户的密码,新建的用户ck_user1需要先修改初始密码,否则会导致认证失败。

    kafka.sasl_username

    连接Kafka的用户名,输入1创建的用户名。

    kafka_auth_mode

    ClickHouse连接Kafka的认证方式,参数值选择UserPassword。

  4. 单击“保存”,在弹窗页面中单击“确定”,保存配置。单击“实例”,勾选ClickHouseServer实例,选择“更多 > 滚动重启实例”,重启ClickHouseServer实例。
  5. 参考使用Kafka客户端,登录到Kafka客户端安装目录。

    1. 以Kafka客户端安装用户,登录Kafka安装客户端的节点。
    2. 执行以下命令,切换到客户端安装目录。

      cd /opt/client

    3. 执行以下命令配置环境变量。

      source bigdata_env

    4. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。

      kinit 组件业务用户

  6. 执行以下命令,创建Kafka的Topic。详细的命令使用可以参考管理Kafka主题

    kafka-topics.sh --topic topic1 --create --zookeeper ZooKeeper角色实例IP:ZooKeeper侦听客户端连接的端口/kafka --partitions 2 --replication-factor 1

    • --topic参数值为要创建的Topic名称,本示例创建的名称为topic1
    • --zookeeper:ZooKeeper角色实例所在节点IP地址,填写三个角色实例其中任意一个的IP地址即可ZooKeeper角色实例所在节点IP获取参考如下:

      登录FusionInsight Manager页面,选择“集群 > 服务 > ZooKeeper > 实例”,查看ZooKeeper角色实例的IP地址。

    • --partitions主题分区数和--replication-factor主题备份个数不能大于Kafka角色实例数量。
    • ZooKeeper侦听客户端连接的端口获取方式:登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值,默认为24002。

  7. 登录ClickHouse客户端节点,连接ClickHouse服务端,具体请参考从零开始使用ClickHouse章节。
  8. 创建Kafka的表引擎,示例如下:

    CREATE TABLE queue1 (
    key String,
    value String,
    event_date DateTime
    ) ENGINE = Kafka()
    SETTINGS kafka_broker_list = 'kafka_ip1:21007,kafka_ip2:21007,kafka_ip3:21007',
    kafka_topic_list = 'topic1',
    kafka_group_name = 'group1',
    kafka_format = 'CSV',
    kafka_row_delimiter = '\n',
    kafka_handle_error_mode='stream';

    相关参数说明如下表:

    参数

    参数说明

    kafka_broker_list

    Kafka集群Broker实例的IP和端口列表。例如:kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092。

    Kafka集群Broker实例IP获取方法如下:

    登录FusionInsight Manager页面,选择“集群 > 服务 > Kafka”。单击“实例”,查看Kafka角色实例的IP地址。

    kafka_topic_list

    消费Kafka的Topic。

    kafka_group_name

    Kafka消费组。

    kafka_format

    消费数据的格式化类型,JSONEachRow表示每行一条数据的json格式,CSV格式表示逗号分隔的一行数据。

    kafka_row_delimiter

    每个消息体(记录)之间的分隔符。

    kafka_handle_error_mode

    设置为stream,会把每条消息处理的异常打印出来。需要创建视图,通过视图查询异常数据的具体处理异常。

    创建视图语句,示例如下:

    CREATE MATERIALIZED VIEW default.kafka_errors2
    (
    `topic` String,
    `key` String,
    `partition` Int64,
    `offset` Int64,
    `timestamp` Date,
    `timestamp_ms` Int64,
    `raw` String,
    `error` String
    )
    ENGINE = MergeTree
    ORDER BY (topic, partition, offset)
    SETTINGS index_granularity = 8192 AS
    SELECT
    _topic AS topic,
    _key  AS key,
    _partition AS partition,
    _offset AS offset,
    _timestamp AS timestamp,
    _timestamp_ms AS timestamp_ms,
    _raw_message AS raw,
    _error AS error
    FROM default.queue1;

    查询视图,示例如下:

    host1 :) select * from kafka_errors2;  SELECT * FROM kafka_errors2  Query id: bf4d788f-bcb9-44f5-95d0-a6c83c591ddb  ┌─topic──┬─key─┬─partition─┬─offset─┬──timestamp─┬─timestamp_ms─┬─raw─┬─error────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ topic1 │     │         1 │      8 │ 2023-06-20 │   1687252213 │ 456 │ Cannot parse date: value is too short: (at row 1) Buffer has gone, cannot extract information about what has been parsed. │ └────────┴─────┴───────────┴────────┴────────────┴──────────────┴─────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘  1 rows in set. Elapsed: 0.003 sec.   host1 :)

    kafka_skip_broken_messages

    (可选)表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监测数据。

    kafka_num_consumers

    (可选)单个Kafka Engine的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数。

    其他配置可参考:https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka

  9. 通过客户端连接ClickHouse创建本地表,示例如下:

    CREATE TABLE daily1(
    key String,
    value String,
    event_date DateTime
    )ENGINE = MergeTree()
    ORDER BY key;

  10. 通过客户端连接ClickHouse创建物化视图,示例如下:

    CREATE MATERIALIZED VIEW default.consumer TO default.daily1 (
    `event_date` DateTime,
    `key` String,
    `value` String
    ) AS
    SELECT
    event_date,
    key,
    value
    FROM default.queue1;

  11. 再次执行5,进入Kafka客户端安装目录。
  12. 执行以下命令,在Kafka的Topic中产生消息。例如,如下命令向6中创建的Topic发送消息。

    kafka-console-producer.sh --broker-list kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092 --topic topic1
    >a1,b1,'2020-08-01 10:00:00'
    >a2,b2,'2020-08-02 10:00:00'
    >a3,b3,'2020-08-02 10:00:00'
    >a4,b4,'2023-09-02 10:00:00'

  13. 查询消费到的Kafka数据,查询上述的物化视图,示例如下:

    select * from daily1;