更新时间:2025-09-22 GMT+08:00
分享

ClickHouse集群消费DMS Kafka数据到内表

应用场景

此章节为您介绍数据消费的最佳实践,通过DMS Kafka制造数据消费给CloudTable ClickHouse集群,实现Kafka实时入库到ClickHouse集群的过程。

使用限制

  • DMS集群为非安全集群。
  • 为了确保网络连通,DMS Kafka实例必须与ClickHouse集群的安全组、区域、VPC、子网保持一致。

准备工作

步骤一:在Kafka中创建Topic

  1. 登录DMS服务控制台,进入Kafka实例列表页面。
  2. 单击实例名称,进入实例详情页面。
  3. 单击左侧导航栏“Topic管理”,进入Topic管理页面。
  4. 单击“创建Topic”,弹出“创建Topic”页面,配置参数后单击“确定”。

    具体参数配置请参见创建Kafka Topic

步骤二:创建ClickHouse表

  1. 使用SSH工具连接ClickHouse集群。
    ./clickhouse client --host 内网地址   --port 9000 --user admin --password password
  2. 创建消费消息表Kafka_table。
    获取{broker_ip}:port:单击目标实例操作列“更多 > 查看连接地址”,弹出“查看连接地址”窗口,可查看{broker_ip}:port。
    CREATE TABLE kafka_table (
        id UInt32,
        name String,
        age UInt8
    ) ENGINE = Kafka
    SETTINGS kafka_broker_list = '{broker_ip}:port',  -- Kafka Broker 地址
             kafka_topic_list = 'test_topic',          -- Topic 名称
             kafka_group_name = 'group1',              -- 消费者组名称
             kafka_format = 'JSONEachRow';             -- 消息格式
    表1 参数说明

    参数

    是否必填

    描述

    kafka_broker_list

    以逗号分隔的代理列表。

    kafka_topic_list

    Kafka主题的列表。

    kafka_group_name

    一组Kafka消费者。每个组的阅读边距分别被跟踪。如果您不希望消息在集群中重复,请在所有地方使用相同的组名。

    kafka_format

    消息格式。使用与SQL FORMAT函数相同的表示法,例如JSONEachRow。

    kafka_schema

    如果格式需要模式定义,则必须使用的参数。例如,Cap'n Proto需要模式文件的路径和根模式的名称。capnp:消息对象。

    kafka_num_consumers

    每个表的消费者数量。如果一个消费者的吞吐量不够,可以指定更多的消费者。消费者总数不应超过topic中的分区数,因为每个分区只能分配一个消费者,且不能大于ClickHouse所在服务器的物理核数。

    缺省值:1。

    kafka_max_block_size

    轮询的最大批处理大小(以消息为单位)。默认值:max_insert_block_size。

    kafka_skip_broken_messages

    Kafka消息解析器对每个块的架构不兼容消息的容忍度。如果kafka_skip_broken_messages = N,则引擎会跳过N条无法解析的Kafka消息(一条消息等于一行数据)。缺省值:0

    kafka_commit_each_batch

    在写入整个数据块后,提交每个消费和处理的批处理,而不是单个提交。缺省值:0

    kafka_client_id

    客户端标识符。默认为空。

    kafka_poll_timeout_ms

    Kafka中单个轮询的超时时间。默认值:stream_poll_timeout_ms。

    kafka_poll_max_batch_size

    单个Kafka轮询中要轮询的最大消息量。默认值:max_block_size。

    kafka_flush_interval_ms

    从Kafka刷新数据的超时时间。默认值:stream_flush_interval_ms。

    kafka_thread_per_consumer

    为每个消费者提供独立的线程。启用后,每个消费者都会独立并行地刷新数据(多个消费者的行会挤成一个数据块)。缺省值:0。

    kafka_handle_error_mode

    如何处理Kafka引擎的错误。可能的值:default(如果解析消息失败,将抛出异常),stream(异常消息和原始消息将保存在虚拟列_error和_raw_message中)。

    kafka_commit_on_select

    在进行select查询时提交消息。默认值:false。

    kafka_max_rows_per_message

    对于基于行的格式,在一个kafka消息中写入的最大行数。缺省值:1。

  3. 创建目标表target_table,用户存储Kafka消费的数据。
    CREATE TABLE target_table (
        id UInt32,
        name String,
        age UInt8
    ) ENGINE = MergeTree()
    ORDER BY id;

步骤三:使用test_topic生产消息

  1. 登录DMS服务控制台,进入Kafka实例列表页面。
  2. 单击实例名称,进入实例详情页面。
  3. 单击左侧导航栏“Topic管理”,进入Topic管理页面。
  4. 选择需要操作的Topic,单击操作列的“生产消息”,弹出生产消息页面。
  5. 在生产消息页面填写参数后,确认无误,单击“确定”,生产消息发送至ClickHouse集群。
    图2 生产消息

步骤四:结果验证

  1. 生产消息后,进入ClickHouse集群命令操作后台执行以下命令。
    允许ClickHouse识别并操作流式引擎表。
    SET stream_like_engine_allow_direct_select = 1;
  2. 给目标表target_table写入数据。
    INSERT INTO target_table SELECT * FROM kafka_table;
  3. 查询目标表。
    select * from target_table;

相关文档