SparkStreaming Fails to Consume Kafka Messages, and "Error getting partition metadata" Is Displayed
Symptom
When SparkStreaming is used to consume messages of a specified topic in Kafka, data cannot be obtained from Kafka. The message "Error getting partition metadata" is displayed.
Exception in thread "main" org.apache.spark.SparkException: Error getting partition metadata for 'testtopic'. Does the topic exist? org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) scala.util.Either.fold(Either.scala:97) org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) com.xxxxxx.bigdata.spark.examples.FemaleInfoCollectionPrint$.main(FemaleInfoCollectionPrint.scala:45) com.xxxxxx.bigdata.spark.examples.FemaleInfoCollectionPrint.main(FemaleInfoCollectionPrint.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:762) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Possible Causes
- The Kafka service is abnormal.
- The Consumer client adopts non-security access and access is disabled on the server.
- The Consumer client adopts non-security access and ACL is set for Kafka topics.
Cause Analysis
- Check the Kafka service status:
- MRS Manager: Log in to MRS Manager and choose Services > Kafka. Check the Kafka status. The status is Good, and the monitoring metrics are correctly displayed.
- FusionInsight Manager: Log in to FusionInsight Manager and choose Cluster > Name of the target cluster > Service > Kafka.
Check the Kafka status. It is found that the status is good and the monitoring metrics are correctly displayed.
- On Manager, check the current Kafka cluster configuration. It is found that allow.everyone.if.no.acl.found is not configured or is set to false.
- MRS Manager portal: Log in to MRS Manager and choose Services > Kafka > Configuration.
- FusionInsight Manager: Log in to FusionInsight Manager and choose Cluster > Name of the target cluster > Service > Kafka > Configuration.
- If it is set to false, the Kafka non-secure port 21005 cannot be used for access.
- Run the client command to check the ACL permission of the topic.
[root@10-10-144-2 client]# kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --list --topic topic_acl Current ACLs for resource `Topic:topic_acl`: User:test_user has Allow permission for operations: Describe from hosts: * User:test_user has Allow permission for operations: Write from hosts: *
If an ACL is configured for a topic, the Kafka non-secure port 21005 cannot be used to access the topic.
Solution
- Add the customized configuration allow.everyone.if.no.acl.found or change its value to true and restart the Kafka service.
- Delete the ACL configured for the topic.
Example:
kinit test_user
This operation must be performed by the Kafka administrator (belonging to the kafkaadmin group).
Example:
kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --producer --topic topic_acl
kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --consumer --topic topic_acl --group test
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot