Help Center/ MapReduce Service/ Troubleshooting/ Using Kafka/ Consumer Fails to Consume Data and Remains in the Waiting State
Updated on 2024-12-18 GMT+08:00

Consumer Fails to Consume Data and Remains in the Waiting State

Symptom

An MRS cluster is installed, and ZooKeeper and Kafka are installed in the cluster.

When the Consumer consumes data from Kafka, the client stays in the Waiting state.

Possible Causes

  1. The Kafka service is abnormal.
  2. The Consumer of the client adopts non-security access and access is disabled on the server.
  3. The Consumer of the client adopts non-security access and ACL is set for Kafka topics.

Cause Analysis

The possible reasons why the Consumer fails to consume data from Kafka may be related to the Consumer or Kafka.

  1. Check the Kafka service status:
    • MRS Manager: Log in to MRS Manager and choose Services > Kafka. Check the Kafka status. The status is Good, and the monitoring metrics are correctly displayed.
    • FusionInsight Manager: Log in to FusionInsight Manager and choose Cluster > Services > Kafka. Check the Kafka status. The status is Good, and the monitoring metrics are correctly displayed.
  2. Check the Consumer client log. It is found that the information about the frequent connections and disconnections to the Broker node is printed, as shown in the following output.
    [root@10-10-144-2 client]# kafka-console-consumer.sh --topic test --zookeeper 10.5.144.2:2181/kafka --from-beginning 
    [2017-03-07 09:22:00,658] INFO Fetching metadata from broker BrokerEndPoint(1,10.5.144.2,9092) with correlation id 26 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
    [2017-03-07 09:22:00,659] INFO Connected to 10.5.144.2:9092 for producing (kafka.producer.SyncProducer)
    [2017-03-07 09:22:00,659] INFO Disconnecting from 10.5.144.2:9092 (kafka.producer.SyncProducer)

    Consumer accesses Kafka using port 9092, which is a non-security port.

  3. On Manager, check the current Kafka cluster configuration. It is found that the customized configuration allow.everyone.if.no.acl.found=false is not configured.
    • MRS Manager: Log in to MRS Manager and choose Services > Kafka > Configurations.
    • FusionInsight Manager: Log in to FusionInsight Manager and choose Cluster > Services > Kafka > Configurations.
  4. If ACL is set to false, port 9092 cannot be used for access.
  5. Check the Consumer client log. It is found that the information about the frequent connections and disconnections to the Broker node is printed, as shown in the following output.
    [root@10-10-144-2 client]# kafka-console-consumer.sh --topic test_acl --zookeeper 10.5.144.2:2181/kafka --from-beginning
    [2017-03-07 09:49:16,992] INFO Fetching metadata from broker BrokerEndPoint(2,10.5.144.3,9092) with correlation id 16 for 1 topic(s) Set(topic_acl) (kafka.client.ClientUtils$)
    [2017-03-07 09:49:16,993] INFO Connected to 10.5.144.3:9092 for producing (kafka.producer.SyncProducer)
    [2017-03-07 09:49:16,994] INFO Disconnecting from 10.5.144.3:9092 (kafka.producer.SyncProducer)

    The Consumer accesses Kafka using port 21005, which is a non-security port.

  6. Run the client command to check the ACL permission of the topic.
    [root@10-10-144-2 client]# kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --list --topic topic_acl
    Current ACLs for resource `Topic:topic_acl`: 
     User:test_user has Allow permission for operations: Describe from hosts: *
    User:test_user has Allow permission for operations: Write from hosts: * 

    If ACL is set for the topic, port 9092 cannot be used for access.

  7. The following information is printed in the Consumer client log:
    [root@10-10-144-2 client]# kafka-console-consumer.sh --topic topic_acl --bootstrap-server 10.5.144.2:21007 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --new-consumer
    [2017-03-07 10:19:18,478] INFO Kafka version : 0.9.0.0 (org.apache.kafka.common.utils.AppInfoParser)
    [2017-03-07 10:19:18,478] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser)

    The Consumer uses port 21007 to access Kafka.

  8. Run the client command klist to query the current authenticated user.
    [root@10-10-144-2 client]# klist
    Ticket cache: FILE:/tmp/krb5cc_0
    Default principal: test@HADOOP.COM
    
    Valid starting     Expires            Service principal
    01/25/17 11:06:48  01/26/17 11:06:45  krbtgt/HADOOP.COM@HADOOP.COM

    The test user is used in this example.

  9. Run the client command to check the ACL permission of the topic.
    [root@10-10-144-2 client]# kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:24002/kafka --list --topic topic_acl
    Current ACLs for resource `Topic:topic_acl`: 
     User:test_user has Allow permission for operations: Describe from hosts: *
     User:test_user has Allow permission for operations: Write from hosts: * 
     User:ttest_user has Allow permission for operations: Read from hosts: *

    If ACL is set for the topic, user test does not have the permission to perform the Consumer operation.

    For details about the solution, see 2.

  10. Log in to Kafka Broker using SSH.

    Run the cd /var/log/Bigdata/kafka/broker command to go to the log directory.

    Check the kafka-authorizer.log file. It shows that the user does not belong to the kafka or kafkaadmin group.

    2017-01-25 13:26:33,648 | INFO  | [kafka-request-handler-0] | The principal is test, belongs to Group : [hadoop, ficommon] | kafka.authorizer.logger (SimpleAclAuthorizer.scala:169)
    2017-01-25 13:26:33,648 | WARN  | [kafka-request-handler-0] | The user is not belongs to kafka or kafkaadmin group, authorize failed! | kafka.authorizer.logger (SimpleAclAuthorizer.scala:170)

    For details about the solution, see 3.

Solution

  1. Set allow.everyone.if.no.acl.found to true and restart the Kafka service.
  2. Use the account with permission for login.

    Example:

    kinit test_user

    Alternatively, grant the user with related permission.

    This operation must be performed by the Kafka administrator (belonging to the kafkaadmin group).

    Example:

    kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --topic topic_acl --consumer --add --allow-principal User:test --group test

    [root@10-10-144-2 client]# kafka-acls.sh --authorizer-properties zookeeper.connect=8.5.144.2:2181/kafka --list --topic topic_acl
    Current ACLs for resource `Topic:topic_acl`: 
     User:test_user has Allow permission for operations: Describe from hosts: *
     User:test_user has Allow permission for operations: Write from hosts: *
      User:test has Allow permission for operations: Describe from hosts: *
      User:test has Allow permission for operations: Write from hosts: *
      User:test has Allow permission for operations: Read from hosts: *

  3. Add the user to the kafka or kafkaadmin group.