Updated on 2022-06-01 GMT+08:00

Scenario Description

Assume that Flink receives one message record every second in a service.

Develop a Flink application that can output prefixed message content in real time.

Data Planning

Flink sample project data is stored in Kafka. Data is sent to and obtained from Kafka (user with Kafka permission required).

  1. Ensure that a cluster containing HDFS, YARN, Flink, and Kafka has been successfully installed.
  2. Create a topic.

    1. Configure the user permission for creating topics on the server.

      For a security cluster with Kerberos authentication enabled, change the value of the Kafka broker configuration parameter allow.everyone.if.no.acl.found to true. After the configuration is complete, restart Kafka. You do not need to configure this parameter for normal clusters with Kerberos authentication disabled.

    2. Run a Linux command to create a topic. Before running a command, run the kinit command, for example, kinit flinkuser, to authenticate the human-machine account.

      flinkuser is created by yourself and has permission to create Kafka topics. For details, see Preparing a Development User.

      The command for creating a topic is as follows:

      bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic}

      Table 1 Parameters

      Parameter

      Description

      {zkQuorum}

      ZooKeeper cluster information in the IP:port format

      {partitionNum}

      Number of topic partitions

      {replicationNum}

      Number of data replicas of each partition in a topic

      {Topic}

      Topic name

      For example, run the following command in the Kafka client path. In the following command example, the values of the IP:port of the ZooKeeper cluster are 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181, and the topic name is topic1.

      bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181/kafka --partitions 5 --replication-factor 1 --topic topic1

  3. If Kerberos authentication is enabled for the cluster, perform this step for security authentication. Otherwise, skip this step.

    • Configuration of Kerberos authentication
      1. Client configuration

        In the Flink configuration file flink-conf.yaml, add configurations about Kerberos authentication. For example, add KafkaClient in contexts as follows:

        security.kerberos.login.keytab: /home/demo/flink/release/flink-1.2.1/keytab/admin.keytab
        security.kerberos.login.principal: admin
        security.kerberos.login.contexts: Client,KafkaClient
        security.kerberos.login.use-ticket-cache: false
      2. Running parameter

        The following is an example of running parameters about the SASL_PLAINTEXT protocol:

        --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT  --sasl.kerberos.service.name kafka //10.96.101.32:21007 indicates the IP:port of the Kafka server.

Development Guidelines

  1. Start the Flink Kafka Producer application to send data to Kafka.
  2. Start the Flink Kafka Consumer application to receive data from Kafka. Ensure that topics of Kafka Consumer are consistent with that of Kafka Producer.
  3. Add a prefix to data content and print the result.