Configuring Kafka
Sample project data of Flink is stored in Kafka. A user with Kafka permission can send data to Kafka and receive data from it.
- Ensure that clusters, including HDFS, Yarn, Flink, and Kafka are installed.
- Create a topic.
- Run Linux command line to create a topic. Before running commands, ensure that the kinit command, for example, kinit flinkuser, is run for authentication.
To create a Flink user, you need to have the permission to create Kafka topics.
The format of the command is shown as follows, in which {zkQuorum} indicates ZooKeeper cluster information and the format is IP:port, and {Topic} indicates the topic name.
bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 5 --topic {Topic}
Assume the topic name is topic 1. The command for creating this topic is displayed as follows:/opt/client/Kafka/kafka/bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --replication-factor 1 --partitions 5 --topic topic1
The ZooKeeper cluster information is as follows:
- Service IP address of the ZooKeeper quorumpeer instance:
Log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the page that is displayed, click the Instance tab and view the service IP addresses of all nodes where the quorumpeer instances reside.
- Port number of the ZooKeeper client:
Log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the page that is displayed, click the Configurations tab. On this tab page, view the value of clientPort.
- Service IP address of the ZooKeeper quorumpeer instance:
- Configure the permission of the topic on the server.
Set the allow.everyone.if.no.acl.found parameter of Kafka Broker to true.
- Run Linux command line to create a topic. Before running commands, ensure that the kinit command, for example, kinit flinkuser, is run for authentication.
- Perform the security authentication.
The Kerberos authentication, SSL encryption authentication, or Kerberos + SSL authentication mode can be used.
- Kerberos authentication
- Client configuration
In the Flink configuration file flink-conf.yaml, add configurations about Kerberos authentication. For example, add KafkaClient in contexts as follows:
security.kerberos.login.keytab: /home/demo/keytab/flinkuser.keytab security.kerberos.login.principal: flinkuser security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false
- Running parameter
Running parameters about the SASL_PLAINTEXT protocol are as follows:
--topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.System domain name.com //10.96.101.32:21007 indicates the IP address and port number of the Kafka server.
- Client configuration
- SSL encryption
- Configure the server.
Log in to FusionInsight Manager, choose Cluster > Services > Kafka > Configurations, and set Type to All. Search for ssl.mode.enable and set it to true.
- Configure the client.
- Log in to FusionInsight Manager, choose Cluster > Name of the desired cluster > Services > Kafka > More > Download Client to download Kafka client.
- Use the ca.crt certificate file in the client root directory to generate the truststore file for the client.
Run the following command:
keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks
The command execution result is similar to the following:
- Run parameters.
The value of ssl.truststore.password must be the same as the password you entered when creating truststore. Run the following command to run parameters. Commands carrying authentication passwords pose security risks. Disable historical command recording before running such commands to prevent information leakage.
--topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX
- Configure the server.
- Kerberos+SSL encryption
After completing preceding configurations of the client and server of Kerberos and SSL, modify the port number and protocol type in running parameters to enable the Kerberos+SSL encryption mode.
--topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location --kerberos.domain.name hadoop.System domain name.com /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX
- Kerberos authentication
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot