对接普通模式Kafka
操作场景
本章节主要介绍ClickHouse连接普通模式的Kafka,消费Kafka的数据。
前提条件
- 已创建Kafka集群,且为普通模式。
- 已创建ClickHouse集群,并且ClickHouse集群和Kafka集群网络可以互通,并安装ClickHouse客户端。
操作步骤
- 登录ClickHouse服务所在集群的Manager页面,选择“集群 > 服务 > ClickHouse > 配置 > 全部配置 > ClickHouseServer(角色) > 引擎”,修改如下参数:
参数
参数说明
kafka.security_protocol
参数值:plaintext
kafka_auth_mode
ClickHouse连接Kafka的认证方式,参数值选择NoAuth。
- 单击“保存”,在弹窗页面中单击“确定”,保存配置。单击“实例”,勾选ClickHouseServer实例,选择“更多 > 滚动重启实例”,重启ClickHouseServer实例。
- 参考使用Kafka客户端,登录到Kafka客户端安装目录。
- 执行以下命令,创建Kafka的Topic。详细的命令使用可以参考管理Kafka主题。
kafka-topics.sh --topic topic1 --create --zookeeper ZooKeeper角色实例IP:ZooKeeper侦听客户端连接的端口/kafka --partitions 2 --replication-factor 1
- --topic参数值为要创建的Topic名称,本示例创建的名称为topic1 。
- --zookeeper:ZooKeeper角色实例所在节点IP地址,填写三个角色实例其中任意一个的IP地址即可。ZooKeeper角色实例所在节点IP获取参考如下:
登录FusionInsight Manager页面,选择“集群 > 服务 > ZooKeeper > 实例”,查看ZooKeeper角色实例的IP地址。
- --partitions主题分区数和--replication-factor主题备份个数不能大于Kafka角色实例数量。
- ZooKeeper侦听客户端连接的端口获取方式:登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值,默认为24002。
- 登录ClickHouse客户端节点,连接ClickHouse服务端,具体请参考从零开始使用ClickHouse章节。
- 创建Kafka的表引擎,示例如下:
CREATE TABLE queue1 ( key String, value String, event_date DateTime ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka_ip1:21005,kafka_ip2:21005,kafka_ip3:21005', kafka_topic_list = 'topic1', kafka_group_name = 'group2', kafka_format = 'CSV', kafka_row_delimiter = '\n', kafka_handle_error_mode='stream';
相关参数说明如下表:
参数
参数说明
kafka_broker_list
Kafka集群Broker实例的IP和端口列表。例如:kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092。
Kafka集群broker实例IP获取方法如下:
登录FusionInsight Manager页面,选择“集群 > 服务 > Kafka”。单击“实例”,查看Kafka角色实例的IP地址。
kafka_topic_list
消费Kafka的Topic。
kafka_group_name
Kafka消费组。
kafka_format
消费数据的格式化类型,JSONEachRow表示每行一条数据的json格式,CSV格式表示逗号分隔的一行数据。
kafka_row_delimiter
每个消息体(记录)之间的分隔符。
kafka_handle_error_mode
设置为stream,会把每条消息处理的异常打印出来。需要创建视图,通过视图查询异常数据的具体处理异常。
创建视图语句,示例如下:
CREATE MATERIALIZED VIEW default.kafka_errors2 ( `topic` String, `key` String, `partition` Int64, `offset` Int64, `timestamp` Date, `timestamp_ms` Int64, `raw` String, `error` String ) ENGINE = MergeTree ORDER BY (topic, partition, offset) SETTINGS index_granularity = 8192 AS SELECT _topic AS topic, _key AS key, _partition AS partition, _offset AS offset, _timestamp AS timestamp, _timestamp_ms AS timestamp_ms, _raw_message AS raw, _error AS error FROM default.queue1;
查询视图,示例如下:
host1 :) select * from kafka_errors2; SELECT * FROM kafka_errors2 Query id: bf4d788f-bcb9-44f5-95d0-a6c83c591ddb ┌─topic──┬─key─┬─partition─┬─offset─┬──timestamp─┬─timestamp_ms─┬─raw─┬─error────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ topic1 │ │ 1 │ 8 │ 2023-06-20 │ 1687252213 │ 456 │ Cannot parse date: value is too short: (at row 1) Buffer has gone, cannot extract information about what has been parsed. │ └────────┴─────┴───────────┴────────┴────────────┴──────────────┴─────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ 1 rows in set. Elapsed: 0.003 sec. host1 :)
kafka_skip_broken_messages
(可选)表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监测数据。
kafka_num_consumers
(可选)单个Kafka Engine的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数。
其他配置可参考https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka。
- 通过客户端连接ClickHouse创建本地表,示例如下:
CREATE TABLE daily1( key String, value String, event_date DateTime )ENGINE = MergeTree() ORDER BY key;
- 通过客户端连接ClickHouse创建物化视图,示例如下:
CREATE MATERIALIZED VIEW default.consumer1 TO default.daily1 ( `event_date` DateTime, `key` String, `value` String ) AS SELECT event_date, key, value FROM default.queue1;
- 再次执行3,进入Kafka客户端安装目录。
- 执行以下命令,在Kafka的Topic中产生消息。例如,如下命令向4中创建的Topic发送消息。
kafka-console-producer.sh --broker-list kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092 --topic topic1
>a1,b1,'2020-08-01 10:00:00' >a2,b2,'2020-08-02 10:00:00' >a3,b3,'2020-08-02 10:00:00' >a4,b4,'2023-09-02 10:00:00'
- 查询消费到的Kafka数据,查询上述的物化视图,示例如下:
select * from daily1;