Updated on 2023-09-14 GMT+08:00

Configuring Kafka

Sample project data of Flink is stored in Kafka. A user with Kafka permission can send data to Kafka and receive data from it.

  1. Ensure that clusters, including HDFS, Yarn, Flink, and Kafka are installed.
  2. Create a topic.

    • Run Linux command line to create a topic. Before running commands, ensure that the kinit command, for example, kinit flinkuser, is run for authentication.

      To create a Flink user, you need to have the permission to create Kafka topics.

      The format of the command is shown as follows, in which {zkQuorum} indicates ZooKeeper cluster information and the format is IP:port, and {Topic} indicates the topic name.

      bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 5 --topic {Topic}

      Assume the topic name is topic 1. The command for creating this topic is displayed as follows:
      /opt/client/Kafka/kafka/bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --replication-factor 1 --partitions 5 --topic topic1

      The ZooKeeper cluster information is as follows:

      • Service IP address of the ZooKeeper quorumpeer instance:

        Log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the page that is displayed, click the Instance tab and view the service IP addresses of all nodes where the quorumpeer instances reside.

      • Port number of the ZooKeeper client:

        Log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the page that is displayed, click the Configurations tab. On this tab page, view the value of clientPort.

    • Configure the permission of the topic on the server.

      Set the allow.everyone.if.no.acl.found parameter of Kafka Broker to true.

  3. Perform the security authentication.

    The Kerberos authentication, SSL encryption authentication, or Kerberos + SSL authentication mode can be used.

    • Kerberos authentication
      • Client configuration

        In the Flink configuration file flink-conf.yaml, add configurations about Kerberos authentication. For example, add KafkaClient in contexts as follows:

        security.kerberos.login.keytab: /home/demo/keytab/flinkuser.keytab
        security.kerberos.login.principal: flinkuser
        security.kerberos.login.contexts: Client,KafkaClient
        security.kerberos.login.use-ticket-cache: false
      • Running parameter

        Running parameters about the SASL_PLAINTEXT protocol are as follows:

        --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT  --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.System domain name.com //10.96.101.32:21007 indicates the IP:port of the Kafka server.
    • SSL encryption
      • Configure the server.

        Log in to FusionInsight Manager, choose Cluster > Services > Kafka > Configurations, and set Type to All. Search for ssl.mode.enable and set it to true.

      • Configure the client.
        1. Log in to FusionInsight Manager, choose Cluster > Name of the desired cluster > Services > Kafka > More > Download Client to download Kafka client.
        2. Use the ca.crt certificate file in the client root directory to generate the truststore file for the client.
          Run the following command:
          keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks 

          The command execution result is similar to the following:

        3. Run parameters.

          The value of ssl.truststore.password must be the same as the password you entered when creating truststore. Run the following command to run parameters:

          --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX
    • Kerberos+SSL encryption

      After completing preceding configurations of the client and server of Kerberos and SSL, modify the port number and protocol type in running parameters to enable the Kerberos+SSL encryption mode.

      --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL  --sasl.kerberos.service.name kafka --ssl.truststore.location --kerberos.domain.name hadoop.System domain name.com /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX