Configuring Kafka
Sample project data of Flink is stored in Kafka. A user with Kafka permission can send data to Kafka and receive data from it.
- Ensure that clusters, including HDFS, Yarn, Flink, and Kafka are installed.
- Create a topic.
- Run Linux command line to create a topic. Before running commands, ensure that the kinit command, for example, kinit flinkuser, is run for authentication.
To create a Flink user, you need to have the permission to create Kafka topics.
The format of the command is shown as follows, in which {zkQuorum} indicates ZooKeeper cluster information and the format is IP:port, and {Topic} indicates the topic name.
bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 5 --topic {Topic}
Assume the topic name is topic 1. The command for creating this topic is displayed as follows:/opt/client/Kafka/kafka/bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --replication-factor 1 --partitions 5 --topic topic1
The ZooKeeper cluster information is as follows:
- Service IP address of the ZooKeeper quorumpeer instance:
Log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the page that is displayed, click the Instance tab and view the service IP addresses of all nodes where the quorumpeer instances reside.
- Port number of the ZooKeeper client:
Log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the page that is displayed, click the Configurations tab. On this tab page, view the value of clientPort.
- Service IP address of the ZooKeeper quorumpeer instance:
- Configure the permission of the topic on the server.
Set the allow.everyone.if.no.acl.found parameter of Kafka Broker to true.
- Run Linux command line to create a topic. Before running commands, ensure that the kinit command, for example, kinit flinkuser, is run for authentication.
- Perform the security authentication.
The Kerberos authentication, SSL encryption authentication, or Kerberos + SSL authentication mode can be used.
For versions earlier than MRS 3.x, only Kerberos authentication is supported.
- Kerberos authentication
- Client configuration
In the Flink configuration file flink-conf.yaml, add configurations about Kerberos authentication. For example, add KafkaClient in contexts as follows:
security.kerberos.login.keytab: /home/demo/keytab/flinkuser.keytab security.kerberos.login.principal: flinkuser security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false
For versions earlier than MRS 3.x, set security.kerberos.login.keytab to /home/demo/flink/release/keytab/flinkuser.keytab.
- Running parameter
Running parameters about the SASL_PLAINTEXT protocol are as follows:
--topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka //10.96.101.32:21007 indicates the IP:port of the Kafka server.
- Client configuration
- SSL encryption
- Configure the server.
Log in to FusionInsight Manager, choose Cluster > Services > Kafka > Configurations, and set Type to All. Search for ssl.mode.enable and set it to true.
- Configure the client.
- Log in to FusionInsight Manager, choose Cluster > Name of the desired cluster > Services > Kafka > More > Download Client to download Kafka client.
- Use the ca.crt certificate file in the client root directory to generate the truststore file for the client.
Run the following command:
keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks
The command execution result is similar to the following:
- Run parameters.
The value of ssl.truststore.password must be the same as the password you entered when creating truststore. Run the following command to run parameters:
--topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX
- Configure the server.
- Kerberos+SSL encryption
After completing preceding configurations of the client and server of Kerberos and SSL, modify the port number and protocol type in running parameters to enable the Kerberos+SSL encryption mode.
--topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX
- Kerberos authentication
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.