Help Center> MapReduce Service> Troubleshooting> Using Kafka> SparkStreaming Fails to Consume Kafka Messages, and "Error getting partition metadata" Is Displayed
Updated on 2023-11-30 GMT+08:00

SparkStreaming Fails to Consume Kafka Messages, and "Error getting partition metadata" Is Displayed

Symptom

When SparkStreaming is used to consume messages of a specified topic in Kafka, data cannot be obtained from Kafka. The message "Error getting partition metadata" is displayed.

Exception in thread "main" org.apache.spark.SparkException:  Error getting partition metadata for 'testtopic'. Does the topic exist?
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
scala.util.Either.fold(Either.scala:97)
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint$.main(FemaleInfoCollectionPrint.scala:45)
com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint.main(FemaleInfoCollectionPrint.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:762)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Possible Causes

  1. The Kafka service is abnormal.
  2. The Consumer client adopts non-security access and access is disabled on the server.
  3. The Consumer of the client adopts non-security access and ACL is set for Kafka topics.

Cause Analysis

  1. Check the Kafka service status:
    • MRS Manager: Log in to MRS Manager and choose Services > Kafka. Check the Kafka status. The status is Good, and the monitoring metrics are correctly displayed.
    • FusionInsight Manager: Log in to FusionInsight Manager and choose Cluster > Services > Kafka. Check the Kafka status. The status is Good, and the monitoring metrics are correctly displayed.
  2. On Manager, check the current Kafka cluster configuration. It is found that allow.everyone.if.no.acl.found is not configured or is set to false.
  3. If it is set to false, the Kafka non-secure port 21005 cannot be used for access.
  4. Run the client command to check the ACL permission of the topic.
    [root@10-10-144-2 client]# kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --list --topic topic_acl
    Current ACLs for resource `Topic:topic_acl`: 
     User:test_user has Allow permission for operations: Describe from hosts: *
    User:test_user has Allow permission for operations: Write from hosts: * 

    If an ACL is configured for a topic, the Kafka non-secure port 21005 cannot be used to access the topic.

Solution

  1. Add the customized configuration allow.everyone.if.no.acl.found or change its value to true and restart the Kafka service.
  2. Delete the ACL configured for the topic.

    Example:

    kinit test_user

    This operation must be performed by the Kafka administrator (belonging to the kafkaadmin group).

    Example:

    kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --producer --topic topic_acl

    kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --consumer --topic topic_acl --group test