Development Plan of Kafka Data Producing and Consuming
Assume that Flink receives one message record every second in a service.
Develop a Flink application that can output prefixed message content in real time.
Data Planning
Flink sample project data is stored in Kafka. Data is sent to and obtained from Kafka (user with Kafka permission required).
- Ensure that a cluster containing HDFS, YARN, Flink, and Kafka has been successfully installed.
- Create a topic.
- Configure the user permission for creating topics on the server.
For a security cluster with Kerberos authentication enabled, change the value of the Kafka broker configuration parameter allow.everyone.if.no.acl.found to true. After the configuration is complete, restart Kafka. You do not need to configure this parameter for normal clusters with Kerberos authentication disabled.
- Run a Linux command to create a topic. Before running a command, run the kinit command, for example, kinit flinkuser, to authenticate the human-machine account.
flinkuser is created by yourself and has permission to create Kafka topics. For details, see Preparing a Flink Application Development User.
The command for creating a topic is as follows:
bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic}
Table 1 Parameters Parameter
Description
{zkQuorum}
ZooKeeper cluster information in the IP:port format
{partitionNum}
Number of topic partitions
{replicationNum}
Number of data replicas of each partition in a topic
{Topic}
Topic name
For example, run the following command in the Kafka client path. In the following command example, the values of the IP:port of the ZooKeeper cluster are 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181, and the topic name is topic1.
bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181/kafka --partitions 5 --replication-factor 1 --topic topic1
- Configure the user permission for creating topics on the server.
- If Kerberos authentication is enabled for the cluster, perform this step for security authentication. Otherwise, skip this step.
- Configuration of Kerberos authentication
- Client configuration
In the Flink configuration file flink-conf.yaml, add configurations about Kerberos authentication. For example, add KafkaClient in contexts as follows:
security.kerberos.login.keytab: /home/demo/flink/release/flink-1.2.1/keytab/admin.keytab security.kerberos.login.principal: admin security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false
- Running parameter
The following is an example of running parameters about the SASL_PLAINTEXT protocol:
--topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka //10.96.101.32:21007 indicates the IP:port of the Kafka server.
- Client configuration
- Configuration of Kerberos authentication
Development Guidelines
- Start the Flink Kafka Producer application to send data to Kafka.
- Start the Flink Kafka Consumer application to receive data from Kafka. Ensure that topics of Kafka Consumer are consistent with that of Kafka Producer.
- Add a prefix to data content and print the result.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot