文档首页/ MapReduce服务 MRS/ 故障排除/ 使用Kafka/ SparkStreaming消费Kafka消息失败,提示“Error getting partition metadata”
更新时间:2023-12-22 GMT+08:00

SparkStreaming消费Kafka消息失败,提示“Error getting partition metadata”

问题现象

使用SparkStreaming来消费Kafka中指定Topic的消息时,发现无法从Kafka中获取到数据。提示如下错误: Error getting partition metadata。

Exception in thread "main" org.apache.spark.SparkException:  Error getting partition metadata for 'testtopic'. Does the topic exist?
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
scala.util.Either.fold(Either.scala:97)
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint$.main(FemaleInfoCollectionPrint.scala:45)
com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint.main(FemaleInfoCollectionPrint.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:762)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

可能原因

  1. Kafka服务异常。
  2. 客户端Consumer侧采用非安全访问,服务端配置禁止访问。
  3. 客户端Consumer侧采用非安全访问,Kafka Topic设置ACL。

原因分析

  1. 查看Kafka服务状态:
    • MRS Manager界面操作:登录MRS Manager,依次选择“服务管理 > Kafka”,查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。
    • FusionInsight Manager界面操作:登录FusionInsight Manager,选择“集群 > 服务 > Kafka”,查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。
  2. 通过Manager页面,查看当前Kafka集群配置,发现未配置“allow.everyone.if.no.acl.found”或配置为“false”
  3. 当ACL设置为false不允许采用Kafka非安全端口21005来进行访问。
  4. 通过客户端命令查看topic的ACL权限设置信息:
    [root@10-10-144-2 client]# kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --list --topic topic_acl
    Current ACLs for resource `Topic:topic_acl`: 
     User:test_user has Allow permission for operations: Describe from hosts: *
    User:test_user has Allow permission for operations: Write from hosts: * 

    Topic设置ACL,则不允许采用Kafka非安全端口21005来访问。

解决办法

  1. 修改或者添加自定义配置“allow.everyone.if.no.acl.found”参数为“true”,重启Kafka服务。
  2. 删除Topic设置的ACL。

    例如:

    kinit test_user

    需要使用Kafka管理员用户(属于kafkaadmin组)操作。

    例如:

    kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --producer --topic topic_acl

    kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --consumer --topic topic_acl --group test