Updated on 2022-09-14 GMT+08:00

Scenario Description

Scenario Description

In Spark applications, use Structured Streaming to call Kafka APIs to obtain word records. Classify word records to obtain the number of records of each word.

Data Planning

Data of the Structured Streaming sample project is stored in Kafka. Send data to Kafka (A user with the Kafka permission is required).

  1. Ensure that the clusters are installed, including HDFS, Yarn, Spark, and Kafka.
  2. Modify allow.everyone.if.no.acl.found of Kafka Broker to true. (This parameter does not need to be set for the normal cluster.)
  3. Create a topic.

    {zkQuorum} indicates ZooKeeper cluster information in the IP:port format.

    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic}

  4. Start the Producer of Kafka to send data to Kafka.

{ClassPath} indicates the path for storing the JAR file of the project. The path is specified by users. For details, see Compiling and Running Applications.

java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{JAR_PATH} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}

  • JAR_PATH indicates the path of the JAR package. The value of BrokerList is in brokerIp:9092 format.
  • If the user needs to connect to the security Kafka, add KafkaClient configuration information to the jaas.conf file in the conf directory on the Spark client. The following is an example:
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab = "./user.keytab"
    principal="leoB@HADOOP.COM"
    useTicketCache=false
    storeKey=true
    debug=true;
    };

    In Spark on Yarn mode, jaas.conf and user.keytab are distributed to the container directory of Spark on Yarn through Yarn. Therefore, the path of keyTab in KafkaClient must be the same as the path of jaas.conf, for example, ./user.keytab. Change principal to the username created by yourself and domain name of the cluster.

Development Guidelines

  1. Receive data from Kafka and generate the corresponding DataStreamReader.
  2. Classify word records.
  3. Calculate the result and print it.