Connecting ClickHouse to the Kafka Using the Username and Password
Scenarios
This section describes how to connect ClickHouse to Kafka using a username and password and consume Kafka data.
Notes and Constraints
This section applies only to MRS 3.3.0-LTS or later.
Prerequisites
- A Kafka cluster has been created and is in security mode (Kerberos authentication is enabled).
- The cluster client has been installed.
- If ClickHouse and Kafka are not in the same cluster, establish cross-cluster mutual trust between them.
Procedure
- Log in to FusionInsight Manager, select Kafka, choose System > Permission > User, and click Create User. Create a human-machine user with Kafka permission. For example, create a human-machine user ck_user1. Change the initial password upon first login. For details about Kafka user permission, see Kafka User Permissions.
- Choose Cluster > Services > Kafka and click Configurations and then All Configurations. Search for sasl.enabled.mechanisms, and change the value to GSSAPI,PLAIN. Click Save.

- Click Instances, select broker instances, and click More > Instance Rolling Restart.
- Log in to FusionInsight Manager, select ClickHouse, choose Cluster > Services > ClickHouse, and click Configurations and then All Configurations. Choose ClickHouseServer(Role) > Engine, and modify the parameters listed in the following table. Configure the username and password for connecting to Kafka.
Parameter
Description
kafka.sasl_mechanisms
SASL authentication for connecting to Kafka. The parameter value is PLAIN.
kafka.sasl_password
Password for connecting to Kafka. The initial password of the new user ck_user1 must be changed. Otherwise, the authentication fails.
kafka.sasl_username
Username for connecting to Kafka. Enter the username created in 1.
kafka_auth_mode
Authentication mode for the ClickHouse to connect to the Kafka. Set this parameter to UserPassword.
Figure 1 Modifying parameters
- Click Save. In the displayed dialog box, click OK to save the configuration. Choose Instances, select ClickHouseServer, and click More > Instance Rolling Restart.
- Go to the Kafka client installation directory. For details, see Using the Kafka Client.
- Log in to the node where the Kafka client is installed as the Kafka client installation user.
- Run the following command to go to the client installation directory:
cd {Client installation directory} - Run the following command to configure environment variables:
source bigdata_env
- If Kerberos authentication is enabled for the cluster, run the following command to authenticate the user. If Kerberos authentication is disabled for the cluster, skip this step.
kinit Component service user
- Run the following command to create a Kafka topic. For details, see Creating a Kafka Topic.
kafka-topics.sh --topic topic1 --create --bootstrap-server <Kafka cluster IP address:9092> --command-config Kafka/kafka/config/client.properties --partitions 2 --replication-factor 1
Table 1 Parameter description Parameter
Description
--topic
Name of the topic to be created, for example, topic1.
--bootstrap-server
IP address of the node in the Kafka cluster. To obtain the information, perform the following operations:
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the page that is displayed, click Instances to query the IP address of the Broker role instance.
--partitions
Number of topic partitions. The value cannot be greater than the number of Kafka role instances.
-replication-factor
Number of topic replicas. The value cannot be greater than the number of Kafka role instances.
- Log in to the ClickHouse client node and connect it to the ClickHouse server. For details, see ClickHouse Client Practices.
- Create a Kafka table engine. The following is an example:
CREATE TABLE queue1 ( key String, value String, event_date DateTime ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka_ip1:21007,kafka_ip2:21007,kafka_ip3:21007', kafka_topic_list = 'topic1', kafka_group_name = 'group1', kafka_format = 'CSV', kafka_row_delimiter = '\n', kafka_handle_error_mode='stream';
The following table lists the related parameters.
Parameter
Description
kafka_broker_list
A list of IP addresses and port numbers of Kafka broker instances. For example, :IP address 1 of Kafka broker instance:9092,IP address 2 of Kafka broker instance:9092,IP address 3 of Kafka broker instance:9092
To obtain the IP address of a Kafka broker instance, perform the following operations:
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. Click the Instances tab to query the IP addresses of the Kafka instances.
kafka_topic_list
Topic where Kafka data is consumed
kafka_group_name
Kafka consumer group
kafka_format
Formatting type of consumed data. JSONEachRow indicates the JSON format (a piece of data in each line). CSV indicates the data is in a line but separated by commas (,).
kafka_row_delimiter
Delimiter character, which ends a message.
kafka_handle_error_mode
If this parameter is set to stream, each message processing exception is printed. You need to create a view and query the specific exception of abnormal data through the view.
The following example shows you how to create a view:
CREATE MATERIALIZED VIEW default.kafka_errors2 ( `topic` String, `key` String, `partition` Int64, `offset` Int64, `timestamp` Date, `timestamp_ms` Int64, `raw` String, `error` String ) ENGINE = MergeTree ORDER BY (topic, partition, offset) SETTINGS index_granularity = 8192 AS SELECT _topic AS topic, _key AS key, _partition AS partition, _offset AS offset, _timestamp AS timestamp, _timestamp_ms AS timestamp_ms, _raw_message AS raw, _error AS error FROM default.queue1;
Query the view. The following is an example:
host1 :) select * from kafka_errors2; SELECT * FROM kafka_errors2 Query id: bf4d788f-bcb9-44f5-95d0-a6c83c591ddb ┌─topic──┬─key─┬─partition─┬─offset─┬──timestamp─┬─timestamp_ms─┬─raw─┬─error────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ topic1 │ │ 1 │ 8 │ 2023-06-20 │ 1687252213 │ 456 │ Cannot parse date: value is too short: (at row 1) Buffer has gone, cannot extract information about what has been parsed. │ └────────┴─────┴───────────┴────────┴────────────┴──────────────┴─────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ 1 rows in set. Elapsed: 0.003 sec. host1 :)
kafka_skip_broken_messages
(Optional) Number of Kafka data records where parsing exceptions are ignored. If N exceptions occur and the background thread ends, the materialized view is re-arranged to monitor the data.
kafka_num_consumers
(Optional) Number of consumers of a single Kafka engine. You can set this parameter to a larger value to improve the consumption data throughput. But the maximum value of this parameter cannot exceed the total number of partitions of the corresponding topic.
- Connect the client to ClickHouse to create a local table. The following is an example:
CREATE TABLE daily1( key String, value String, event_date DateTime )ENGINE = MergeTree() ORDER BY key;
- Connect the client to ClickHouse to create a materialized view. The following is an example:
CREATE MATERIALIZED VIEW default.consumer TO default.daily1 ( `event_date` DateTime, `key` String, `value` String ) AS SELECT event_date, key, value FROM default.queue1;
- Perform 6 again to go to the Kafka client installation directory.
- Run the following command to send a message to the topic created in 7:
kafka-console-producer.sh --broker-list IP address 1 of the Kafka Broker instance:9092,IP address 2 of the Kafka Broker instance:9092,IP address 3 of the Kafka Broker instance:9092 --topic topic1 --producer.config Kafka/kafka/config/producer.properties
The result is as follows:>a1,b1,'2020-08-01 10:00:00' >a2,b2,'2020-08-02 10:00:00' >a3,b3,'2020-08-02 10:00:00' >a4,b4,'2023-09-02 10:00:00'
- Query the consumed Kafka data and the preceding materialized view. The following is an example:
select * from daily1;

Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.