通过用户密码对接Kafka
操作场景
本章节主要介绍ClickHouse通过用户名和密码的方式连接Kafka,消费Kafka的数据。
前提条件
- 已创建Kafka集群,且为安全模式。
- 已安装集群客户端。
- 如果ClickHouse与Kafka不在同一个集群需要建立跨集群互信。
操作步骤
- 登录Kafka服务所在Manager页面,选择“系统 > 权限 > 用户 > 添加用户”,创建一个具有Kafka权限的人机用户,例如创建人机用户ck_user1,首次使用需要修改初始密码。Kafka用户权限介绍请参考管理Kafka用户权限。
- 选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索参数“sasl.enabled.mechanisms”,修改参数值为“GSSAPI,PLAIN”,单击“保存”。
- 登录ClickHouse服务所在Manager页面,选择“集群 > 服务 > ClickHouse > 配置 > 全部配置 > ClickHouseServer(角色) > 引擎”,修改如下参数,配置连接Kafka的用户名和密码。
参数
参数说明
kafka.sasl_mechanisms
指定连接Kafka使用的SASL认证机制,参数值为PLAIN。
kafka.sasl_password
连接Kafka用户的密码,新建的用户ck_user1需要先修改初始密码,否则会导致认证失败。
kafka.sasl_username
连接Kafka的用户名,输入1创建的用户名。
kafka_auth_mode
ClickHouse连接Kafka的认证方式,参数值选择UserPassword。
- 单击“保存”,在弹窗页面中单击“确定”,保存配置。单击“实例”,勾选ClickHouseServer实例,选择“更多 > 滚动重启实例”,重启ClickHouseServer实例。
- 参考使用Kafka客户端,登录到Kafka客户端安装目录。
- 执行以下命令,创建Kafka的Topic。详细的命令使用可以参考管理Kafka主题。
kafka-topics.sh --topic topic1 --create --zookeeper ZooKeeper角色实例IP:ZooKeeper侦听客户端连接的端口/kafka --partitions 2 --replication-factor 1
- --topic参数值为要创建的Topic名称,本示例创建的名称为topic1 。
- --zookeeper:ZooKeeper角色实例所在节点IP地址,填写三个角色实例其中任意一个的IP地址即可。ZooKeeper角色实例所在节点IP获取参考如下:
登录FusionInsight Manager页面,选择“集群 > 服务 > ZooKeeper > 实例”,查看ZooKeeper角色实例的IP地址。
- --partitions主题分区数和--replication-factor主题备份个数不能大于Kafka角色实例数量。
- ZooKeeper侦听客户端连接的端口获取方式:登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值,默认为24002。
- 登录ClickHouse客户端节点,连接ClickHouse服务端,具体请参考从零开始使用ClickHouse章节。
- 创建Kafka的表引擎,示例如下:
CREATE TABLE queue1 ( key String, value String, event_date DateTime ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka_ip1:21007,kafka_ip2:21007,kafka_ip3:21007', kafka_topic_list = 'topic1', kafka_group_name = 'group1', kafka_format = 'CSV', kafka_row_delimiter = '\n', kafka_handle_error_mode='stream';
相关参数说明如下表:
参数
参数说明
kafka_broker_list
Kafka集群Broker实例的IP和端口列表。例如:kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092。
Kafka集群Broker实例IP获取方法如下:
登录FusionInsight Manager页面,选择“集群 > 服务 > Kafka”。单击“实例”,查看Kafka角色实例的IP地址。
kafka_topic_list
消费Kafka的Topic。
kafka_group_name
Kafka消费组。
kafka_format
消费数据的格式化类型,JSONEachRow表示每行一条数据的json格式,CSV格式表示逗号分隔的一行数据。
kafka_row_delimiter
每个消息体(记录)之间的分隔符。
kafka_handle_error_mode
设置为stream,会把每条消息处理的异常打印出来。需要创建视图,通过视图查询异常数据的具体处理异常。
创建视图语句,示例如下:
CREATE MATERIALIZED VIEW default.kafka_errors2 ( `topic` String, `key` String, `partition` Int64, `offset` Int64, `timestamp` Date, `timestamp_ms` Int64, `raw` String, `error` String ) ENGINE = MergeTree ORDER BY (topic, partition, offset) SETTINGS index_granularity = 8192 AS SELECT _topic AS topic, _key AS key, _partition AS partition, _offset AS offset, _timestamp AS timestamp, _timestamp_ms AS timestamp_ms, _raw_message AS raw, _error AS error FROM default.queue1;
查询视图,示例如下:
host1 :) select * from kafka_errors2; SELECT * FROM kafka_errors2 Query id: bf4d788f-bcb9-44f5-95d0-a6c83c591ddb ┌─topic──┬─key─┬─partition─┬─offset─┬──timestamp─┬─timestamp_ms─┬─raw─┬─error────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ topic1 │ │ 1 │ 8 │ 2023-06-20 │ 1687252213 │ 456 │ Cannot parse date: value is too short: (at row 1) Buffer has gone, cannot extract information about what has been parsed. │ └────────┴─────┴───────────┴────────┴────────────┴──────────────┴─────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ 1 rows in set. Elapsed: 0.003 sec. host1 :)
kafka_skip_broken_messages
(可选)表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监测数据。
kafka_num_consumers
(可选)单个Kafka Engine的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数。
其他配置可参考:https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka。
- 通过客户端连接ClickHouse创建本地表,示例如下:
CREATE TABLE daily1( key String, value String, event_date DateTime )ENGINE = MergeTree() ORDER BY key;
- 通过客户端连接ClickHouse创建物化视图,示例如下:
CREATE MATERIALIZED VIEW default.consumer TO default.daily1 ( `event_date` DateTime, `key` String, `value` String ) AS SELECT event_date, key, value FROM default.queue1;
- 再次执行5,进入Kafka客户端安装目录。
- 执行以下命令,在Kafka的Topic中产生消息。例如,如下命令向6中创建的Topic发送消息。
kafka-console-producer.sh --broker-list kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092 --topic topic1
>a1,b1,'2020-08-01 10:00:00' >a2,b2,'2020-08-02 10:00:00' >a3,b3,'2020-08-02 10:00:00' >a4,b4,'2023-09-02 10:00:00'
- 查询消费到的Kafka数据,查询上述的物化视图,示例如下:
select * from daily1;