更新时间:2025-09-22 GMT+08:00
分享

ClickHouse集群消费MRS Kafka数据到内表

应用场景

此章节为您介绍数据消费的最佳实践,通过MRS Kafka制造数据消费给CloudTable ClickHouse集群,实现Kafka实时入库到ClickHouse集群的过程。

使用限制

  • MRS集群未开启Kerberos认证。
  • 为了确保网络连通,MRS集群必须与ClickHouse集群的安全组、区域、VPC、子网保持一致。

准备工作

步骤一:在MRS集群中创建Topic

  1. 登录MRS控制台下载并安装客户端
  2. 在“概览”页面单击“IAM用户同步”,勾选“全部同步”后单击“同步”,等待IAM用户同步完成。
  3. 选择“组件管理 > Kafka > 实例”,查看Kafka Broker角色实例的IP地址,记录任意一个IP地址。
    图2 查看Broker角色实例IP
  4. 单击“服务配置”,查看Kafka Broker连接端口“port”参数值。
  5. 在MRS安全组的入方向规则,添加本地主机IP。请参见安全组规则配置
  6. 配置安全组后,以root用户登录MRS客户端所在节点。
  7. 执行以下命令切换到客户端安装目录。
    cd /opt/client
  8. 执行以下命令配置环境变量。
    source bigdata_env
  9. 进入bin目录。
    cd Kafka/kafka/bin
  10. 创建主题test_topic。
    获取IP和端口:进入集群的Manager页面,单击“集群 > Kafka > 配置 > 全部配置”进入全部配置页面,在页面右上角搜索框搜索“default.bootstrap.severs”参数可获取IP和端口。
    ./kafka-topics.sh --create --topic test_topic --bootstrap-server {broker_ip}:port

步骤二:创建ClickHouse表

  1. 下载安装客户端
  2. 使用SSH工具连接ClickHouse集群。
    ./clickhouse client --host 内网地址   --port 9000 --user admin --password password
  3. 创建消费消息表Kafka_table。
    CREATE TABLE kafka_table (
        id UInt32,
        name String,
        age UInt8
    ) ENGINE = Kafka
    SETTINGS kafka_broker_list = '{broker_ip}:port',  -- Kafka Broker 地址
             kafka_topic_list = 'test_topic',          -- Topic 名称
             kafka_group_name = 'group1',              -- 消费者组名称
             kafka_format = 'JSONEachRow';             -- 消息格式
    表1 参数说明

    参数

    是否必填

    描述

    kafka_broker_list

    以逗号分隔的代理列表。

    kafka_topic_list

    Kafka主题的列表。

    kafka_group_name

    一组Kafka消费者。每个组的阅读边距分别被跟踪。如果您不希望消息在集群中重复,请在所有地方使用相同的组名。

    kafka_format

    消息格式。使用与SQL FORMAT函数相同的表示法,例如JSONEachRow。

    kafka_schema

    如果格式需要模式定义,则必须使用的参数。例如,Cap'n Proto需要模式文件的路径和根模式的名称。capnp:消息对象。

    kafka_num_consumers

    每个表的消费者数量。如果一个消费者的吞吐量不够,可以指定更多的消费者。消费者总数不应超过topic中的分区数,因为每个分区只能分配一个消费者,且不能大于ClickHouse所在服务器的物理核数。

    缺省值:1。

    kafka_max_block_size

    轮询的最大批处理大小(以消息为单位)。默认值:max_insert_block_size。

    kafka_skip_broken_messages

    Kafka消息解析器对每个块的架构不兼容消息的容忍度。如果kafka_skip_broken_messages = N,则引擎会跳过N条无法解析的Kafka消息(一条消息等于一行数据)。缺省值:0

    kafka_commit_each_batch

    在写入整个数据块后,提交每个消费和处理的批处理,而不是单个提交。缺省值:0

    kafka_client_id

    客户端标识符。默认为空。

    kafka_poll_timeout_ms

    Kafka中单个轮询的超时时间。默认值:stream_poll_timeout_ms。

    kafka_poll_max_batch_size

    单个Kafka轮询中要轮询的最大消息量。默认值:max_block_size。

    kafka_flush_interval_ms

    从Kafka刷新数据的超时时间。默认值:stream_flush_interval_ms。

    kafka_thread_per_consumer

    为每个消费者提供独立的线程。启用后,每个消费者都会独立并行地刷新数据(多个消费者的行会挤成一个数据块)。缺省值:0。

    kafka_handle_error_mode

    如何处理Kafka引擎的错误。可能的值:default(如果解析消息失败,将抛出异常),stream(异常消息和原始消息将保存在虚拟列_error和_raw_message中)。

    kafka_commit_on_select

    在进行select查询时提交消息。默认值:false。

    kafka_max_rows_per_message

    对于基于行的格式,在一个kafka消息中写入的最大行数。缺省值:1。

  4. 创建目标表target_table,用户存储Kafka消费的数据。
    CREATE TABLE target_table (
        id UInt32,
        name String,
        age UInt8
    ) ENGINE = MergeTree()
    ORDER BY id;

步骤三:向test_topic发送消息

使用SSH工具连接MRS集群,Producer向test_topic发送消息。
./kafka-console-producer.sh --broker-list {broker_ip}:9092 --topic test_topic

{broker_ip}为客户的Kafka Broker地址。

{"id": 1, "name": "Alice", "age": 25}

步骤四:结果验证

  1. 给主题发送消息后,进入ClickHouse集群命令操作后台操作以下命令。
    允许ClickHouse识别并操作流式引擎表。
    SET stream_like_engine_allow_direct_select = 1;
  2. 给目标表target_table写入数据。
    INSERT INTO target_table SELECT * FROM kafka_table;
  3. 查询目标表。
    select * from target_table;

相关文档