Updated on 2022-11-18 GMT+08:00

Scenario Description

Scenario

Assume that a Flink service receives one message record every second.

A Flink application is developed to output prefixed message content in real time based on service requirements.

Data Preparation

Flink's sample project data is stored in Kafka. A user with Kafka permission can send data to Kafka and receive data from it.
  1. Ensure that clusters have been installed, including HDFS, Yarn, Flink, and Kafka.
  2. Create a topic.
    1. Configure the user permission for creating topics on the server.

      Change the value of the allow.everyone.if.no.acl.found parameter of Kafka Broker to true. See Figure 1. After the configuration is complete, restart the Kafka service.

      Figure 1 Setting the user permission for creating topics
    2. Run a Linux command line to create a topic. Before running a command, run the kinit command, for example, kinit flinkuser, to authenticate the human-machine account.

      To create a Flink user, you need to have the permission to create Kafka topics. For details, see Preparing a Developer Account.

      The command for creating a Kafka topic is as follows:

      bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic}

      Table 1 Parameters

      Parameter

      Description

      {zkQuorum}

      ZooKeeper cluster information in the IP:Port format

      {partitionNum}

      Number of topic partitions

      {replicationNum}

      Number of data replicas of each partition in a topic

      {Topic}

      Topic name

      For example, run the following command on the Kafka client. Assume that the values of IP address:port number for the ZooKeeper cluster are 10.96.101.32:2181, 10.96.101.251:2181, 10.96.101.177:2181, 10.91.8.160:2181, and the topic name is topic1.
      bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --partitions 5 --replication-factor 1 --topic topic1
  3. Perform security authentication.

    You can use Kerberos authentication, SSL encryption authentication, or Kerberos + SSL authentication.

    • Kerberos authentication
      1. Configure the client.

        In Flink configuration file flink-conf.yaml, add configurations about Kerberos authentication. For example, add KafkaClient in contexts as follows:

        security.kerberos.login.keytab: /home/demo/flink/release/flink-x.x.x/keytab/user.keytab
        security.kerberos.login.principal: flinkuser
        security.kerberos.login.contexts: Client,KafkaClient
        security.kerberos.login.use-ticket-cache: false
      2. Run parameters.

        Running parameters about the SASL_PLAINTEXT protocol are as follows:

        --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT  --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.System domain name.com
        • 10.96.101.32:21007: IP address and port number of the Kafka server.
        • System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.
    • SSL encryption
      • Configure the server.

        Set ssl.mode.enable to true. See Figure 2.

        Figure 2 Configuring the server
      • Configure the client.
        1. Log in to FusionInsight Manager and choose Cluster > Name of the desired cluster > Services > Kafka. On the page that is displayed, chosoe More > Download Client to download the Kafka client. See Figure 3.
          Figure 3 Configuring the client
        1. Use the ca.crt certificate file in the client root directory to generate the truststore file for the client.
          Run the following command:
          keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks 

          The command output is as follows:

        2. Run parameters.

          The value of ssl.truststore.password must be the same as the password you entered when creating truststore. Run the following command to run parameters:

          --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_XXX_Kafka_ClientConfig/truststore.jks --ssl.truststore.password huawei //10.96.101.32:9093 indicates the IP:Port of the Kafka server. XXX indicates the FusionInsight version.
    • Kerberos+SSL encryption

      After completing preceding configurations of the client and server of Kerberos and SSL, modify the port number and protocol type in running parameters to enable the Kerberos+SSL encryption mode.

      --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL  --sasl.kerberos.service.name kafka --ssl.truststore.location --kerberos.domain.name hadoop.System domain name.com /home/zgd/software/FusionInsight_XXX_Kafka_ClientConfig/truststore.jks --ssl.truststore.password huawei //10.96.101.32:21009 indicates the IP address:Port number of the Kafka server, and XXX indicates the FusionInsight version number.

Development Guideline

  1. Start Flink Kafka Producer to send data to Kafka.
  2. Start Flink Kafka Consumer to receive data from Kafka. Ensure that Flink Kafka Consumer and Flink Kafka Producer have the same topics.
  3. Add prefixes to data content and print the results.