文档首页/
MapReduce服务 MRS/
故障排除/
使用Kafka/
SparkStreaming消费Kafka消息失败,提示“Error getting partition metadata”
更新时间:2023-12-22 GMT+08:00
SparkStreaming消费Kafka消息失败,提示“Error getting partition metadata”
问题现象
使用SparkStreaming来消费Kafka中指定Topic的消息时,发现无法从Kafka中获取到数据。提示如下错误: Error getting partition metadata。
Exception in thread "main" org.apache.spark.SparkException: Error getting partition metadata for 'testtopic'. Does the topic exist? org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) scala.util.Either.fold(Either.scala:97) org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint$.main(FemaleInfoCollectionPrint.scala:45) com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint.main(FemaleInfoCollectionPrint.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:762) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
可能原因
- Kafka服务异常。
- 客户端Consumer侧采用非安全访问,服务端配置禁止访问。
- 客户端Consumer侧采用非安全访问,Kafka Topic设置ACL。
原因分析
- 查看Kafka服务状态:
- MRS Manager界面操作:登录MRS Manager,依次选择“服务管理 > Kafka”,查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。
- FusionInsight Manager界面操作:登录FusionInsight Manager,选择“集群 > 服务 > Kafka”,查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。
- 通过Manager页面,查看当前Kafka集群配置,发现未配置“allow.everyone.if.no.acl.found”或配置为“false”。
- 当ACL设置为false不允许采用Kafka非安全端口21005来进行访问。
- 通过客户端命令查看topic的ACL权限设置信息:
[root@10-10-144-2 client]# kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --list --topic topic_acl Current ACLs for resource `Topic:topic_acl`: User:test_user has Allow permission for operations: Describe from hosts: * User:test_user has Allow permission for operations: Write from hosts: *
Topic设置ACL,则不允许采用Kafka非安全端口21005来访问。
解决办法
- 修改或者添加自定义配置“allow.everyone.if.no.acl.found”参数为“true”,重启Kafka服务。
- 删除Topic设置的ACL。
例如:
kinit test_user
需要使用Kafka管理员用户(属于kafkaadmin组)操作。
例如:
kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --producer --topic topic_acl
kafka-acls.sh --authorizer-properties zookeeper.connect=10.5.144.2:2181/kafka --remove --allow-principal User:test_user --consumer --topic topic_acl --group test
父主题: 使用Kafka