同步Kafka数据至ClickHouse
您可以通过创建Kafka引擎表将Kafka数据自动同步至ClickHouse集群,具体操作详见本章节描述。
前提条件
- 已创建Kafka集群。已安装Kafka客户端,详细可以参考安装客户端。
- 已创建ClickHouse集群,并且ClickHouse集群和Kafka集群在同一VPC下,网络可以互通,并安装ClickHouse客户端。
约束限制
当前ClickHouse不支持和开启安全模式的Kafka集群进行对接。
Kafka引擎表使用语法说明
- 语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host1:port1,host2:port2', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'; [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N]
- 参数说明
表1 Kafka引擎表参数说明 参数名
是否必选
参数说明
kafka_broker_list
是
Kafka集群broker实例的IP和端口列表。例如:kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092。
说明:启用Kerberos认证下,使用21005端口需要“allow.everyone.if.no.acl.found”参数值设置为true; 若不设置此参数,操作会报错。
Kafka集群broker实例IP获取方法如下:
- MRS 3.x及后续版本,登录FusionInsight Manager,然后选择“集群 > 待操作的集群名称 > 服务 > Kafka”。单击“实例”,查看Kafka角色实例的IP地址。
kafka_topic_list
是
Kafka的topic列表。
kafka_group_name
是
Kafka的Consumer Group名称,可以自己指定。
kafka_format
是
Kafka消息体格式。例如JSONEachRow、CSV、XML等。
kafka_row_delimiter
否
每个消息体(记录)之间的分隔符。
kafka_schema
否
如果解析格式需要一个schema时,此参数必填。
kafka_num_consumers
否
单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过topic中分区的数量,因为每个分区只能分配一个消费者。
Kafka数据同步至ClickHouse操作示例
- 参考Kafka客户端使用实践,切换到Kafka客户端安装目录。
- 执行以下命令,创建Kafka的Topic。详细的命令使用可以参考创建Kafka Topic。
kafka-topics.sh --topic kafkacktest2 --create --zookeeper ZooKeeper角色实例IP:ZooKeeper侦听客户端连接的端口/kafka --partitions 2 --replication-factor 1
- --topic参数值为要创建的Topic名称,本示例创建的名称为kafkacktest2 。
- --zookeeper:ZooKeeper角色实例所在节点IP地址,填写三个角色实例其中任意一个的IP地址即可。ZooKeeper角色实例所在节点IP获取参考如下。
- MRS 3.x之前版本,单击集群名称,登录集群详情页面,选择“组件管理 > ZooKeeper > 实例”。查看ZooKeeper角色实例的IP地址。
- MRS 3.x及后续版本,登录FusionInsight Manager,具体请参见访问FusionInsight Manager(MRS 3.x及之后版本)。然后选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 实例”。查看ZooKeeper角色实例的IP地址。
- --partitions主题分区数和--replication-factor主题备份个数不能大于Kafka角色实例数量。
- ZooKeeper侦听客户端连接的端口获取方式:登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。默认为24002。
- 参考使用ClickHouse客户端登录ClickHouse客户端。
- 执行以下命令,切换到客户端安装目录。
- 执行以下命令配置环境变量。
- 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有创建ClickHouse表的权限,具体请参见ClickHouse用户及权限管理章节,为用户绑定对应角色。如果当前集群未启用Kerberos认证,则无需执行本步骤。
kinit 组件业务用户
例如,kinit clickhouseuser。
- 执行以下命令连接到要导入数据的ClickHouse实例节点。
clickhouse client --host ClickHouse的实例IP --user 登录名 --password --port ClickHouse的端口号 --database 数据库名 --multiline
输入用户密码
- 参考Kafka引擎表使用语法说明,在ClickHouse中创建Kafka引擎表。例如,如下建表语句在default数据库下,创建表名为kafka_src_tbl3,Topic名为kafkacktest2、消息格式为JSONEachRow的Kafka引擎表。
create table kafka_src_tbl3 on cluster default_cluster (id UInt32, age UInt32, msg String) ENGINE=Kafka() SETTINGS kafka_broker_list='kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092', kafka_topic_list='kafkacktest2', kafka_group_name='cg12', kafka_format='JSONEachRow';
- 创建ClickHouse本地复制表。例如,如下创建表名为kafka_dest_tbl3的ReplicatedMergeTree表。
create table kafka_dest_tbl3 on cluster default_cluster ( id UInt32, age UInt32, msg String ) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/kafka_dest_tbl3', '{replica}') partition by age order by id;
- 创建MATERIALIZED VIEW,该视图会在后台转换Kafka引擎中的数据并将其放入创建的ClickHouse表中。
create materialized view consumer3 on cluster default_cluster to kafka_dest_tbl3 as select * from kafka_src_tbl3;
- 再次执行1,进入Kafka客户端安装目录。
- 执行以下命令,在Kafka的Topic中产生消息。例如,如下命令向2中创建的Topic发送消息。
kafka-console-producer.sh --broker-list kafka集群broker实例IP1:9092,kafka集群broker实例IP2:9092,kafka集群broker实例IP3:9092 --topic kafkacktest2
>{"id":31, "age":30, "msg":"31 years old"} >{"id":32, "age":30, "msg":"31 years old"} >{"id":33, "age":30, "msg":"31 years old"} >{"id":35, "age":30, "msg":"31 years old"}
- 使用ClickHouse客户端登录3中ClickHouse实例节点,查询ClickHouse表数据。例如,查询kafka_dest_tbl3本地复制表,Kafka消息中的数据已经同步到该表。
select * from kafka_dest_tbl3;