ClickHouse集群消费MRS Kafka数据到内表
应用场景
此章节为您介绍数据消费的最佳实践,通过MRS Kafka制造数据消费给CloudTable ClickHouse集群,实现Kafka实时入库到ClickHouse集群的过程。
- 了解Kafka请参见MRS产品介绍。
- 了解ClickHouse请参见CloudTable产品介绍。
图1 消息消费架构图
使用限制
- MRS集群未开启Kerberos认证。
- 为了确保网络连通,MRS集群必须与ClickHouse集群的安全组、区域、VPC、子网保持一致。
准备工作
- 已注册华为账号并开通华为云,具体请参见注册华为账号并开通华为云,且在使用CloudTable前检查账号状态,账号不能处于欠费或冻结状态。
- 已创建虚拟私有云和子网,参见创建虚拟私有云和子网。
- 创建MRS集群。
- 创建CloudTable ClickHouse集群。
- ClickHouse集群已安装客户端。
步骤一:在MRS集群中创建Topic
- 登录MRS控制台,下载并安装客户端。
- 在“概览”页面单击“IAM用户同步”,勾选“全部同步”后单击“同步”,等待IAM用户同步完成。
- 选择“组件管理 > Kafka > 实例”,查看Kafka Broker角色实例的IP地址,记录任意一个IP地址。
图2 查看Broker角色实例IP
- 单击“服务配置”,查看Kafka Broker连接端口“port”参数值。
- 在MRS安全组的入方向规则,添加本地主机IP。请参见安全组规则配置。
- 配置安全组后,以root用户登录MRS客户端所在节点。
- 执行以下命令切换到客户端安装目录。
cd /opt/client
- 执行以下命令配置环境变量。
source bigdata_env
- 进入bin目录。
cd Kafka/kafka/bin
- 创建主题test_topic。
步骤二:创建ClickHouse表
- 下载安装客户端。
- 使用SSH工具连接ClickHouse集群。
./clickhouse client --host 内网地址 --port 9000 --user admin --password password
- 创建消费消息表Kafka_table。
CREATE TABLE kafka_table ( id UInt32, name String, age UInt8 ) ENGINE = Kafka SETTINGS kafka_broker_list = '{broker_ip}:port', -- Kafka Broker 地址 kafka_topic_list = 'test_topic', -- Topic 名称 kafka_group_name = 'group1', -- 消费者组名称 kafka_format = 'JSONEachRow'; -- 消息格式
表1 参数说明 参数
是否必填
描述
kafka_broker_list
是
以逗号分隔的代理列表。
kafka_topic_list
是
Kafka主题的列表。
kafka_group_name
是
一组Kafka消费者。每个组的阅读边距分别被跟踪。如果您不希望消息在集群中重复,请在所有地方使用相同的组名。
kafka_format
是
消息格式。使用与SQL FORMAT函数相同的表示法,例如JSONEachRow。
kafka_schema
否
如果格式需要模式定义,则必须使用的参数。例如,Cap'n Proto需要模式文件的路径和根模式的名称。capnp:消息对象。
kafka_num_consumers
否
每个表的消费者数量。如果一个消费者的吞吐量不够,可以指定更多的消费者。消费者总数不应超过topic中的分区数,因为每个分区只能分配一个消费者,且不能大于ClickHouse所在服务器的物理核数。
缺省值:1。
kafka_max_block_size
否
轮询的最大批处理大小(以消息为单位)。默认值:max_insert_block_size。
kafka_skip_broken_messages
否
Kafka消息解析器对每个块的架构不兼容消息的容忍度。如果kafka_skip_broken_messages = N,则引擎会跳过N条无法解析的Kafka消息(一条消息等于一行数据)。缺省值:0
kafka_commit_each_batch
否
在写入整个数据块后,提交每个消费和处理的批处理,而不是单个提交。缺省值:0
kafka_client_id
否
客户端标识符。默认为空。
kafka_poll_timeout_ms
否
Kafka中单个轮询的超时时间。默认值:stream_poll_timeout_ms。
kafka_poll_max_batch_size
否
单个Kafka轮询中要轮询的最大消息量。默认值:max_block_size。
kafka_flush_interval_ms
否
从Kafka刷新数据的超时时间。默认值:stream_flush_interval_ms。
kafka_thread_per_consumer
否
为每个消费者提供独立的线程。启用后,每个消费者都会独立并行地刷新数据(多个消费者的行会挤成一个数据块)。缺省值:0。
kafka_handle_error_mode
否
如何处理Kafka引擎的错误。可能的值:default(如果解析消息失败,将抛出异常),stream(异常消息和原始消息将保存在虚拟列_error和_raw_message中)。
kafka_commit_on_select
否
在进行select查询时提交消息。默认值:false。
kafka_max_rows_per_message
否
对于基于行的格式,在一个kafka消息中写入的最大行数。缺省值:1。
- 创建目标表target_table,用户存储Kafka消费的数据。
CREATE TABLE target_table ( id UInt32, name String, age UInt8 ) ENGINE = MergeTree() ORDER BY id;