SparkStreaming Fails to Consume Kafka Messages, and "Error getting partition metadata" Is Displayed
Symptom
When SparkStreaming is used to consume messages of a specified topic in Kafka, data cannot be obtained from Kafka. The message "Error getting partition metadata" is displayed.
Exception in thread "main" org.apache.spark.SparkException: Error getting partition metadata for 'testtopic'. Does the topic exist? org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) scala.util.Either.fold(Either.scala:97) org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint$.main(FemaleInfoCollectionPrint.scala:45) com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint.main(FemaleInfoCollectionPrint.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:762) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Possible Causes
- The Kafka service is abnormal.
- The Consumer client adopts non-security access and access is disabled on the server.
- The Consumer of the client adopts non-security access and ACL is set for Kafka topics.
Cause Analysis
- Check the Kafka service status:
- MRS Manager: Log in to MRS Manager and choose Services > Kafka. Check the Kafka status. The status is Good, and the monitoring metrics are correctly displayed.
- FusionInsight Manager: Log in to FusionInsight Manager and choose Cluster > Services > Kafka. Check the Kafka status. The status is Good, and the monitoring metrics are correctly displayed.
- On Manager, check the current Kafka cluster configuration. It is found that allow.everyone.if.no.acl.found is not configured or is set to false.
- If it is set to false, the Kafka non-secure port 21005 cannot be used for access.
- Run the client command to check the ACL permission of the topic.
[root@10-10-144-2 client]# kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --list --topic topic_acl Current ACLs for resource `Topic:topic_acl`: User:test_user has Allow permission for operations: Describe from hosts: * User:test_user has Allow permission for operations: Write from hosts: *
If an ACL is configured for a topic, the Kafka non-secure port 21005 cannot be used to access the topic.
Solution
- Add the customized configuration allow.everyone.if.no.acl.found or change its value to true and restart the Kafka service.
- Delete the ACL configured for the topic.
Example:
kinit test_user
This operation must be performed by the Kafka administrator (belonging to the kafkaadmin group).
Example:
kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --producer --topic topic_acl
kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --consumer --topic topic_acl --group test
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.