Updated on 2024-04-29 GMT+08:00

ClickHouse Table Engines

Background

The table engine determines:

  • Where to write and read data.
  • Which queries are supported.
  • Whether concurrent data access is supported.
  • Whether indexes are supported.
  • Whether multi-thread requests can be executed.
  • Parameters used for data replication.

This section describes MergeTree and Distributed engine families, which are the most important and frequently used.

Overview

A table engine is a table type. ClickHouse table engine determines how to store and read data, whether indexes are supported, and whether active/standby replication is supported. The following table lists ClickHouse table engines to help you get started with ClickHouse.

Table 1 Table engines

Engine Family

Description

Engine

Description

MergeTree

  • MergeTree engines are the most universal and functional and are mainly used for heavy-load tasks. They support quick write of a large amount of data and subsequent data processing.
  • MergeTree engines support data replication, partitioning, and data sampling.

MergeTree

  • Data is stored by partition and block based on partitioning keys.
  • Data index is sorted based on primary keys and the ORDER BY sorting keys.
  • Data replication is supported by table engines prefixed with Replicated.
  • Data sampling is supported.

When data is written, a table with this type of engine divides data into different folders based on the partitioning key. Each column of data in the folder is an independent file. A file that records serialized index sorting is created. This structure reduces the volume of data to be retrieved during data reading, greatly improving query efficiency.

RelacingMergeTree

This table engine removes duplicates that have the same primary key value. The MergeTree table engine does not support this feature.

CollapsingMergeTree

CollapsingMergeTree defines a Sign field to record status of data rows. If Sign is 1, the data in this row is valid. If Sign is -1, the data in this row needs to be deleted.

VersionedCollapsingMergeTree

This table engine allows you to add the Version column to the CREATE TABLE statement. This helps resolve the issue that the CollapsingMergeTree table engine cannot collapse or delete rows as expected if the rows are inserted in an incorrect order.

SummigMergeTree

This table engine pre-aggregates primary key columns and combines all rows that have the same primary key into one row. This helps reduce storage usage and improves aggregation performance.

AggregatingMergeTree

This table engine is a pre-aggregation engine and is used to improve aggregation performance. When merging partitions, the AggregatingMergeTree engine aggregates data based on predefined conditions, calculates data based on predefined aggregate functions, and saves the data in binary format to tables.

GraphiteMergeTree

This table engine is used to store and roll up Graphite data. This helps reduce storage space and makes Graphite data queries more efficient.

Replicated*MergeTree

All engines of the MergeTree family in ClickHouse prefixed with Replicated become MergeTree engines that support replicas.

Replicated*MergeTree series

Replicated series engines use ZooKeeper to synchronize data. When a replicated table is created, all replicas of the same shard are synchronized based on the information registered with ZooKeeper.

Distributed

-

Distributed

This table engine does not store data and performs distributed queries on multiple servers.

MergeTree

  • Creating a table.
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse cluster name]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
        ...
        INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
        INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
    ) ENGINE = MergeTree()
    ORDER BY expr
    [PARTITION BY expr]
    [PRIMARY KEY expr]
    [SAMPLE BY expr]
    [TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'], ...]
    [SETTINGS name=value, ...]
  • The following is an example.
    CREATE TABLE default.test (name1 DateTime,name2 String,name3 String,name4 String,name5 Date) ENGINE = MergeTree() PARTITION BY toYYYYMM(name5) ORDER BY (name1, name2) SETTINGS index_granularity = 8192;

    Parameters in the example are described as follows:

    Table 2 Parameter description

    Parameter

    Description

    ENGINE = MergeTree()

    MergeTree table engine.

    PARTITION BY toYYYYMM(name5)

    Partition. The sample data is partitioned by month, and a folder is created for each month.

    ORDER BY

    Sorting field. A multi-field index can be sorted. If the first field is the same, the second field is used for sorting, and so on.

    index_granularity = 8192

    Granularity of a sorting index. One index value is recorded for every 8,192 data records.

    If the data to be queried exists in a partition or sorting field, the data query efficiency is greatly improved.

ReplacingMergeTree

ClickHouse provides the ReplacingMergeTree table engine to remove duplicates that have the same primary key value. The MergeTree table engine does not support this feature.

  • Create a table.
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse cluster name]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = ReplacingMergeTree([ver])
    [PARTITION BY expr]
    [ORDER BY expr]
    [SAMPLE BY expr]
    [SETTINGS name=value, ...]

CollapsingMergeTree

The CollapsingMergeTree table engine removes the limits of the ReplacingMergeTree table engine. This table engine allows you to add the Sign column to the CREATE TABLE statement. Rows are classified into two types. If Sign is 1, the row is a "state" row and is used to add states. If Sign is –1, the row is a "cancel" row and is used to delete states.

  • Statements for creating a table:
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse cluster name]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = CollapsingMergeTree(sign)
    [PARTITION BY expr]
    [ORDER BY expr]
    [SAMPLE BY expr]
    [SETTINGS name=value, ...]
  • Example
    • Sample data

      Assume that you need to calculate how many pages users visited on a website and how long they were there. At a specific time point, write the following row with the state of the user's activity.

      Table 3 Sample data

      UserID

      PageViews

      Duration

      Sign

      4324182021466249494

      5

      146

      1

      4324182021466249494

      5

      146

      -1

      4324182021466249494

      6

      185

      1

      • Sign: Name of the column with the type of row. 1 is a "state" row and –1 is a "cancel" row.
    • Create the Test table.
      CREATE TABLE Test(UserID UInt64,PageViews UInt8,Duration UInt8,Sign Int8)ENGINE = CollapsingMergeTree(Sign) ORDER BY UserID;
    • Insert data.
      • Insert data for the first time.
        INSERT INTO Test VALUES (4324182021466249494, 5, 146, 1);
      • Insert data for the second time.
        INSERT INTO Test VALUES (4324182021466249494, 5, 146, -1),(4324182021466249494, 6, 185, 1);
    • View data.
      SELECT * FROM Test;

      The following query result is returned:

      ┌──────────────UserID─┬─PageViews─┬─Duration─┬─Sign─┐
      │ 4324182021466249494 │         5 │      146 │   -1 │
      │ 4324182021466249494 │         6 │      185 │    1 │
      └─────────────────────┴───────────┴──────────┴──────┘
      ┌──────────────UserID─┬─PageViews─┬─Duration─┬─Sign─┐
      │ 4324182021466249494 │         5 │      146 │    1 │
      └─────────────────────┴───────────┴──────────┴──────┘
    • Aggregate data in a specified column.
      SELECT UserID,sum(PageViews * Sign) AS PageViews,sum(Duration * Sign) AS Duration FROM Test GROUP BY UserID HAVING sum(Sign) > 0;

      The command output is as follows:

      ┌──────────────UserID─┬─PageViews─┬─Duration─┐
      │ 4324182021466249494 │         6 │      185 │
      └─────────────────────┴───────────┴──────────┘
    • Perform force collapsing on data.
      SELECT * FROM Test FINAL;

      The command output is as follows:

      ┌──────────────UserID─┬─PageViews─┬─Duration─┬─Sign─┐
      │ 4324182021466249494 │         6 │      185 │    1 │
      └─────────────────────┴───────────┴──────────┴──────┘

VersionedCollapsingMergeTree

ClickHouse provides the VersionedCollapsingMergeTree table engine to resolve the issue that the CollapsingMergeTree table engine cannot collapse or delete rows as expected if the rows are inserted in an incorrect order. The VersionedCollapsingMergeTree table engine allows you to add the Version column to the CREATE TABLE statement to record the mapping between the "state" rows and "cancel" rows. During background compaction, rows with the same primary key, Version, and Sign are collapsed (deleted).

  • Create a table.
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse cluster name]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = VersionedCollapsingMergeTree(sign, version)
    [PARTITION BY expr]
    [ORDER BY expr]
    [SAMPLE BY expr]
    [SETTINGS name=value, ...]
  • Example
    • Sample data
      Assume that you need to calculate how many pages users visited on a website and how long they were there. At a specific time point, write the following row with the state of the user's activity.
      Table 4 Sample data

      UserID

      PageViews

      Duration

      Sign

      Version

      4324182021466249494

      5

      146

      1

      1

      4324182021466249494

      5

      146

      -1

      1

      4324182021466249494

      6

      185

      1

      2

      • Sign: Name of the column with the type of row. 1 is a "state" row and –1 is a "cancel" row.
      • Version: Name of the column with the version of the object state.
    • Create the T table.
      CREATE TABLE T(UserID UInt64,PageViews UInt8,Duration UInt8,Sign Int8,Version UInt8)ENGINE = VersionedCollapsingMergeTree(Sign, Version)ORDER BY UserID;
    • Insert two different parts of data.
      INSERT INTO T VALUES (4324182021466249494, 5, 146, 1, 1);
      INSERT INTO T VALUES (4324182021466249494, 5, 146, -1, 1),(4324182021466249494, 6, 185, 1, 2);
    • View data.
      SELECT * FROM T;
    • Aggregate data in a specified column.
      SELECT UserID, sum(PageViews * Sign) AS PageViews,sum(Duration * Sign) AS Duration,Version FROM T GROUP BY UserID, Version HAVING sum(Sign) > 0;

      The query result is as follows:

      ┌──────────────UserID─┬─PageViews─┬─Duration─┬─Version─┐
      │ 4324182021466249494 │         6 │      185 │       2 │
      └─────────────────────┴───────────┴──────────┴─────────┘
    • Perform force collapsing on data.
      SELECT * FROM T FINAL;

      The query result is as follows:

      ┌──────────────UserID─┬─PageViews─┬─Duration─┬─Sign─┬─Version─┐
      │ 4324182021466249494 │         6 │      185 │    1 │       2 │
      └─────────────────────┴───────────┴──────────┴──────┴─────────┘

SummingMergeTree

The SummingMergeTree table engine pre-aggregates primary key columns and combines all rows that have the same primary key into one row. This helps reduce storage usage and improves aggregation performance.

  • Create a table.
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse cluster name]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = SummingMergeTree([columns])
    [PARTITION BY expr]
    [ORDER BY expr]
    [SAMPLE BY expr]
    [SETTINGS name=value, ...]
  • Example
    • Create a SummingMergeTree table named testTable.
      CREATE TABLE testTable(id UInt32,value UInt32)ENGINE = SummingMergeTree() ORDER BY id;
    • Inserts data to the testTable table.
      INSERT INTO testTable Values(5,9),(5,3),(4,6),(1,2),(2,5),(1,4),(3,8);
      INSERT INTO testTable Values(88,5),(5,5),(3,7),(3,5),(1,6),(2,6),(4,7),(4,6),(43,5),(5,9),(3,6);
    • Query all data in unmerged parts.
      SELECT * FROM testTable;

      The following query result is returned:

      ┌─id─┬─value─┐
      │  1 │     6 │
      │  2 │     5 │
      │  3 │     8 │
      │  4 │     6 │
      │  5 │    12 │
      └────┴───────┘
      ┌─id─┬─value─┐
      │  1 │     6 │
      │  2 │     6 │
      │  3 │    18 │
      │  4 │    13 │
      │  5 │    14 │
      │ 43 │     5 │
      │ 88 │     5 │
      └────┴───────┘
    • If ClickHouse has not summed up all rows and you need to aggregate data by ID, use the sum function and GROUP BY statement.
      SELECT id, sum(value) FROM testTable GROUP BY id;

      The following query result is returned:

      ┌─id─┬─sum(value)─┐
      │  4 │         19 │
      │  3 │         26 │
      │ 88 │          5 │
      │  2 │         11 │
      │  5 │         26 │
      │  1 │         12 │
      │ 43 │          5 │
      └────┴────────────┘
    • Merge rows manually.
      OPTIMIZE TABLE testTable;

      Query data in the table.

      SELECT * FROM testTable;

      The following query result is returned:

      ┌─id─┬─value─┐
      │  1 │    12 │
      │  2 │    11 │
      │  3 │    26 │
      │  4 │    19 │
      │  5 │    26 │
      │ 43 │     5 │
      │ 88 │     5 │
      └────┴───────┘
      • SummingMergeTree uses the ORDER BY sorting keys as the condition keys to aggregate data. If sorting keys are the same, data records are merged into one and the specified merged fields are aggregated.
      • Data is pre-aggregated only when merging is executed in the background, and the merging execution time cannot be predicted. Therefore, it is possible that some data has been pre-aggregated and some data has not been aggregated. Therefore, the GROUP BY statement must be used during aggregation.

AggregatingMergeTree

The AggregatingMergeTree table engine is also used for pre-aggregation and can improve the aggregation performance.

  • Create a table.
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse cluster name]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = AggregatingMergeTree()
    [PARTITION BY expr]
    [ORDER BY expr]
    [SAMPLE BY expr]
    [TTL expr]
    [SETTINGS name=value, ...]
  • Example

    You do not need to set the AggregatingMergeTree parameter separately. When partitions are merged, data in each partition is aggregated based on the ORDER BY sorting key. You can set the aggregate functions to be used and column fields to be calculated by defining the AggregateFunction type.

    • Create a table.
      create table test_table (name1 String,name2 String,name3 AggregateFunction(uniq,String),name4 AggregateFunction(sum,Int),name5 DateTime) ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(name5) ORDER BY (name1,name2) PRIMARY KEY name1;

    When data of the AggregateFunction type is written or queried, the *state and *merge functions need to be called. The asterisk (*) indicates the aggregate functions used for defining the field type. In the table creation example, the uniq and sum functions are specified for the name3 and name4 fields defined in the test_table, respectively. Therefore, you need to call the uniqState and sumState functions and run the INSERT and SELECT statements when writing data into the table.

    • Insert data.
      insert into test_table select '8','test1',uniqState('name1'),sumState(toInt32(100)),'2021-04-30 17:18:00';
      insert into test_table select '8','test1',uniqState('name1'),sumState(toInt32(200)),'2021-04-30 17:18:00';
    • Query the data.
      select name1,name2,uniqMerge(name3),sumMerge(name4) from test_table group by name1,name2;

      The following query result is returned:

      ┌─name1─┬─name2─┬─uniqMerge(name3)─┬─sumMerge(name4)─┐
      │ 8     │ test1 │                1 │             300 │
      └───────┴───────┴──────────────────┴─────────────────┘

Replicated*MergeTree Engines

All engines of the MergeTree family in ClickHouse prefixed with Replicated become MergeTree engines that support replicas.

Figure 1 MergeTree table engines
  • Template for creating a Replicated engine:
    ENGINE = Replicated*MergeTree('ZooKeeper storage path','Replica name', ...)
    Table 5 Parameters

    Parameter

    Description

    ZooKeeper storage path

    Path for storing table data in ZooKeeper. The path format is /clickhouse/tables/{shard}/Database name/Table name.

    Replica name

    {replica} is typically used to represent the replica name.

Distributed Table Engines

Tables with Distributed engine do not store any data of their own, but serve as a transparent proxy for data shards and can automatically transmit data to each node in the cluster. Distributed tables need to work with other local data tables. Distributed tables distribute received read and write tasks to each local table where data is stored.

Figure 2 Distributed
  • Template for creating a distributed engine:
    ENGINE = Distributed(cluster_name, database_name, table_name, [sharding_key])
    Table 6 Distributed parameters

    Parameter

    Description

    cluster_name

    Cluster name. When a distributed table is read or written, the cluster configuration information is used to search for the corresponding ClickHouse instance node.

    database_name

    Database name.

    table_name

    Name of a local table in the database. It is used to map a distributed table to a local table.

    sharding_key

    Sharding key, based on which a distributed table distributes data to each local table.

  • Example
    • Create a local ReplicatedMergeTree table named demo.
      CREATE TABLE default.demo ON CLUSTER default_cluster( `EventDate` DateTime, `id` UInt64)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/demo', '{replica}') PARTITION BY toYYYYMM(EventDate) ORDER BY id;
    • Create a Distributed table named demo_all based on the local table demo.
      CREATE TABLE default.demo_all ON CLUSTER default_cluster( `EventDate` DateTime, `id` UInt64)ENGINE = Distributed(default_cluster, default, demo, rand());
  • Rules for creating a distributed table:
    • When creating a distributed table, add ON CLUSTER cluster_name to the table creation statement so that the statement can be executed once on a ClickHouse instance and then distributed to all instances in the cluster for execution.
    • Generally, a distributed table is named in the following format: Local table name_all. It forms a one-to-many mapping with local tables. Then, multiple local tables can be operated using the distributed table proxy.
    • Ensure that the structure of a distributed table is the same as that of local tables. If they are inconsistent, no error is reported during table creation, but an exception may be reported during data query or insertion.