Updated on 2022-06-01 GMT+08:00

Scenario Description

Scenario Description

Assume that Kafka receives one word record every second in a service.

Based on some service requirements, a Spark application must be developed to implement the following functions:

Calculate the total number of records of each word in real time.

The following is an example of the log1.txt file.

LiuYang
YuanJing
GuoYijun
CaiXuyu
Liyuan
FangBo
LiuYang
YuanJing
GuoYijun
CaiXuyu
FangBo

Data Planning

Data of the Spark Streaming sample project is stored in Kafka. Send data to Kafka (A user with the Kafka permission is required).

  1. Ensure that the clusters are installed, including HDFS, Yarn, Spark, and Kafka.
  2. Create the input_data1.txt file on the local PC and copy the content of the log1.txt file to input_data1.txt.

    Create the /home/data directory on the client installation node. Upload the preceding file to the /home/data directory.

  3. Modify allow.everyone.if.no.acl.found of Kafka Broker to true. (This parameter does not need to be set for the normal cluster.)
  4. Create a topic.

    {zkQuorum} indicates ZooKeeper cluster information in the IP:port format.

    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}

  5. Start the Producer of Kafka to send data to Kafka.

    java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}

    • JAR_PATH indicates the path of the JAR package. The value of BrokerList is in brokerIp:9092 format.
    • You need to change the value of kerberos.domain.name in the SecurityKafkaWordCount class to the value of kerberos.domain.name in the $KAFKA_HOME/config/consumer.properties file.
    • If the user needs to connect to the security Kafka, add KafkaClient configuration information to the jaas.conf file in the conf directory on the Spark client. The following is an example:
      KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab = "./user.keytab"
      principal="leoB@HADOOP.COM"
      useTicketCache=false
      storeKey=true
      debug=true;
      };

      In Spark on Yarn mode, jaas.conf and user.keytab are distributed to the container directory of Spark on Yarn through Yarn. Therefore, the path of keyTab in KafkaClient must be the same as the path of jaas.conf, for example, ./user.keytab. Change principal to the username created by yourself and domain name of the cluster.

Development Guidelines

  1. Receive data from Kafka and generate the corresponding DStream.
  2. Classify word records.
  3. Calculate the result and print it.