Help Center> MapReduce Service> Troubleshooting> Using Kafka> SparkStreaming Fails to Consume Kafka Messages, and Message "Couldn't find leader offsets" Is Displayed
Updated on 2023-11-30 GMT+08:00

SparkStreaming Fails to Consume Kafka Messages, and Message "Couldn't find leader offsets" Is Displayed

Symptom

When SparkStreaming is used to consume messages of a specified topic in Kafka, data cannot be obtained from Kafka.

The following error message is displayed: Couldn't find leader offsets.

Possible Causes

  • The Kafka service is abnormal.
  • The network is abnormal.
  • The Kafka topic is abnormal.

Cause Analysis

  1. On Manager, check the status of the Kafka cluster. The status is Good, and the monitoring metrics are correctly displayed.
  2. View the error topic information in the SparkStreaming log.

    Run the Kafka commands to obtain the topic assignment information and copy synchronization information, and check the return result.

    kafka-topics.sh --describe --zookeeper <zk_host:port/chroot> --topic <topic name>

    If information in the following figure is displayed, the topic is normal. All partitions have normal leader information.

    Figure 1 Topic distribution information and copy synchronization information

  3. Check whether the network connection between the client and Kafka cluster is normal. If no, contact the network team to rectify the fault.
  4. Log in to Kafka Broker using SSH.

    Run the cd /var/log/Bigdata/kafka/broker command to go to the log directory.

    Check on server.log indicates that the error message is displayed in the log shown in the following figure.

    2018-05-30 12:02:00,246 | ERROR | [kafka-network-thread-6-PLAINTEXT-3] | Processor got uncaught exception. | kafka.network.Processor (Logging.scala:103) 
     java.lang.OutOfMemoryError: Direct buffer memory
     at java.nio.Bits.reserveMemory(Bits.java:694)
     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)

  5. On Manager, check the configuration of the current Kafka cluster.

    • MRS Manager: Log in to MRS Manager and choose Services > Kafka > Service Configuration. Set Type to All. The value of -XX:MaxDirectMemorySize in KAFKA_JVM_PERFORMANCE_OPTS is 1G.
    • FusionInsight Manager: Log in to FusionInsight Manager. Choose Cluster > Services > Kafka > Configurations > All Configurations. The value of -XX:MaxDirectMemorySize in KAFKA_JVM_PERFORMANCE_OPTS is 1G.

  6. If the direct memory is too small, an error is reported. Once the direct memory overflows, the node cannot process new requests. As a result, other nodes or clients fail to access the node due to timeout.

Solution

  1. Log in to Manager and go to the Kafka configuration page.
  2. Set Type to All, and search for and change the value of KAFKA_JVM_PERFORMANCE_OPTS.
  3. Save the configuration and restart the service or instance whose configuration has expired.