更新时间:2024-12-11 GMT+08:00

ClickHouse数据入库规范

规则

  • 写本地表,查询分布式表,提升写入和查询性能,保证写入和查询的数据一致性。
  • 只有在去重诉求的场景下,可以使用分布式表插入,通过sharding key将要去重的数据转发到同一个shard,便于后续去重查询。
  • 外部模块保证数据导入的幂等性。

    ClickHouse不支持数据写入的事务保证。通过外部导入数据模块控制数据的幂等性,比如某个批次的数据导入异常,则drop对应的分区数据或清理掉导入的数据后,重新导入该分区或批次数据。

  • 大批量少频次的写入。

    ClickHouse的每次数据插入,都会生成一到多个part文件,如果data part过多, merge压力会变大,甚至出现各种异常影响数据插入。建议每个批次5k到100k行,写入字段不能太多,太多字段情况下要减少写入行数,以降低对写入节点的内存和CPU压力,每秒不超过1次插入。

  • 多副本并行导入。

    有大数据的导入场景,建议将数据提前拆分成多份,在一个shard内的多个副本同时导入,以分摊一个节点导入数据的压力,同时能提升数据入库的性能,缩短入库时间。

    常见错误:

    Too many parts(304). Merges are processing significantly slower than inserts

    原因分析:MergeTree的merge的速度跟不上目录生成的速度,数据目录越来越多就会抛出这个异常。

建议

  • 一次只插入一个分区内的数据

    如果数据属于不同的分区,则每次插入,不同分区的数据会独立生成part文件,导致part总数量膨胀,建议一批插入的数据属于同一个分区。

  • 写入速率

    单节点写入速度为50~200MB/S,如果写入的数据每行为1Kb,那么写入的速度为50,000到200,000行每秒,如果行数据容量更小,那么写入速度将更高,如果写入性能不够,可以使用多个副本同时写入,同一时间每个副本写入的数据保持均衡。

  • 慎用分布式表批量插入
    • 写分布式表,数据会分发到集群的所有本地表,每个本地表插入的数据量是总插入量的1/N,batch size可能比较小,导致data part过多,merge压力变大,甚至出现异常影响数据插入;
    • 数据的一致性问题:数据先在分布式表写入节点的主机落盘,然后数据被异步地发送到本地表所在主机进行存储,中间没有一致性的校验,如果分布式表写入数据的主机出现异常,会存在数据丢失风险;
    • 对于数据写分布式表和数据写本地表相比,分布式表数据写入性能也会变慢,单批次分布式表写,写入节点的磁盘和网络IO会成为性能瓶颈点。
    • 分布式表转发给各个shard成功与否,插入数据的客户端无法感知,转发失败的数据会不断重试转发,消耗CPU。
  • 大批量数据导入要分时、分节点、扩容

    如果数据盘为SATA盘,当大批量数据集中插入时候,会抢占磁盘,使得磁盘长时间处于繁忙状态,影响其他alter类操作的效率。

    尽量避免批量导数据的SQL并发执行,会给磁盘和ClickHouse并发能力带来冲击。

  • Kafka数据入库

    不建议建ClickHouse kafka表引擎,进行数据同步到ClickHouse中,当前CK的kafka引擎有会导致kafka引擎数据入库产生性能等诸多问题,通过用户使用经验,需要应用侧自己写kafka的数据消费,攒批写入ClickHouse,提升ClickHouse的入库性能。

  • 使用分区替换或增加的方式写入数据

    为避免目标表写入脏数据导致的删改,先将数据写入临时表,再从临时表写入目标表。

    操作步骤如下:

    1. 创建一张与目标表table_dest结构、分区键、排序键、主键、存储策略、引擎都一致的临时表table_source。
    2. 先把数据写到临时表,一次只写入一个分区的数据,检查临时表的数据准确无误。
    3. 使用以下SQL查看目标表的分区:

      SELECT partition AS `partition`,sum(rows) AS `count` FROM system.parts WHERE active AND database=='数据库名' AND table=='表名' GROUP BY partition ORDER BY partition ASC;

    4. 如果目标表存在该分区,将分区替换到目标表,到集群的每个节点上执行如下语法:

      ALTER TABLE table_dest REPLACE PARTITION partition_expr FROM table_source;

    5. 如果目标表不存在该分区,将分区增加到目标表,到集群的每个节点上执行如下语法:

      ALTER TABLE table_dest REPLACE PARTITION tuple() partition_expr FROM table_source;