文档首页> MapReduce服务 MRS> 用户指南(阿布扎比区域)> 故障排除> 使用Kafka> Consumer初始化成功,但是无法从Kafka中获取指定Topic消息
更新时间:2023-03-17 GMT+08:00

Consumer初始化成功,但是无法从Kafka中获取指定Topic消息

问题背景与现象

使用MRS安装集群,主要安装ZooKeeper、Flume、Kafka、Storm、Spark。

使用Storm、Spark、Flume或者自己编写consumer代码来消费Kafka中指定Topic的消息时,发现消费不到任何数据。

可能原因

  1. Kafka服务异常。
  2. Consumer中ZooKeeper相关连接地址配置错误,导致无法消费。
  3. Consumer发生ConsumerRebalanceFailedException异常,导致无法消费。
  4. Consumer发生网络导致的ClosedChannelException异常,导致无法消费。

原因分析

Storm、Spark、Flume或者自定义Consumer代码可以都称为Consumer。

  1. 查看kafka服务状态:
    • MRS Manager界面操作:登录MRS Manager,依次选择 "服务管理 > Kafka ,查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。
    • FusionInsight Manager界面操作:登录FusionInsight Manager,选择“集群 > 待操作集群的名称 > 服务 > Kafka,

      查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。

  2. 通过Kafka Client,判断是否可以正常消费数据。

    假设客户端已经安装在/opt/client目录,test为需要消费的Topic名称, 192.168.234.231为ZooKeeper的IP地址。

    cd /opt/client
    source bigdata_env
    kinit admin
    kafka-topics.sh --zookeeper 192.168.234.231:2181/kafka --describe --topic testkafka-console-consumer.sh --topic test --zookeeper 192.168.234.231:2181/kafka --from-beginning

    当可以消费到数据时,表示集群服务正常。

  3. 查看Consumer相关配置,发现ZooKeeper连接地址错误。
    • Flume
      server.sources.Source02.type=org.apache.flume.source.kafka.KafkaSource                                            
      server.sources.Source02.zookeeperConnect=192.168.234.231:2181
      server.sources.Source02.topic = test
      server.sources.Source02.groupId = test_01
    • Spark
      val zkQuorum = "192.168.234.231:2181"
    • Storm
      BrokerHosts brokerHosts = new ZKHosts("192.168.234.231:2181");
    • Consumer API
      zookeeper.connect="192.168.234.231:2181"

    MRS中Kafka在ZooKeeper存储的ZNode是以/kafka为根路径,有别于开源。Kafka对应的ZooKeeper连接配置为192.168.234.231:2181/kafka。

    Consumer中配置为ZooKeeper连接配置为192.168.234.231:2181,导致无法正确获取Kafka中Topic相关信息。

    解决方法参考1

  4. 查看Consumer相关日志,发现打印ConsumerRebalanceFailedException异常。
    2016-02-03 15:55:32,557 | ERROR | [ZkClient-EventThread-75- 192.168.234.231:2181/kafka] |  Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@34b41dfe]  | org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:77)
    kafka.common.ConsumerRebalanceFailedException: pc-zjqbetl86-1454482884879-2ec95ed3 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:633)
    at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:487)
    at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

    通过异常信息,发现当前Consumer没有在指定的重试次数内完成Rebalance,使得Consumer没有被分配Kafka Topic-Partition,则无法消费消息。

    解决方法参考3

  5. 查看Consumer相关日志,发现打印Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:26,host:192-168-234-231,port:9092] failed错误信息和ClosedChannelException异常。
    [2016-03-04 03:33:53,047] INFO Fetching metadata from broker id:26,host: 192-168-234-231,port:9092 with correlation id 0 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
    [2016-03-04 03:33:55,614] INFO Connected to 192-168-234-231:21005 for producing (kafka.producer.SyncProducer)
    [2016-03-04 03:33:55,614] INFO Disconnecting from 192-168-234-231:21005 (kafka.producer.SyncProducer)
    [2016-03-04 03:33:55,615] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:26,host: 192-168-234-231,port:21005] failed (kafka.client.ClientUtils$)
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
    [2016-03-04 03:33:55,615] INFO Disconnecting from 192-168-234-231:21005 (kafka.producer.SyncProducer)

    通过异常信息,发现当前Consumer无法从Kafka Broker 192-168-234-231节点获取元数据,导致无法连接正确的Broker获取消息。

  6. 检查网络是否存在问题,如果网络没有问题,检查是否配置主机和IP的对应关系
    • Linux

      执行cat /etc/hosts命令。

      图1 示例1
    • Windows

      打开“C:\Windows\System32\drivers\etc\hosts”。

      图2 示例2

      解决方法参考4

解决办法

  1. ZooKeeper连接地址配置错误。
  2. 修改Consumer配置中的ZooKeeper连接地址信息,保证和MRS相一致。

    • Flume
      server.sources.Source02.type=org.apache.flume.source.kafka.KafkaSource
      server.sources.Source02.zookeeperConnect=192.168.234.231:2181/kafka
      server.sources.Source02.topic = test
      server.sources.Source02.groupId = test_01
    • Spark
      val zkQuorum = "192.168.234.231:2181/kafka"
    • Storm
      BrokerHosts brokerHosts = new ZKHosts("192.168.234.231:2181/kafka");
    • Consumer API
      zookeeper.connect="192.168.234.231:2181/kafka"

  3. Rebalance异常。

    同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时消费多个partition数据,consumer端也会有负载均衡(consumer个数小于partitions数量时)。

    consumer实际上是靠存储在zk中的临时节点来表明针对哪个topic的那个partition拥有读权限的。所在路径为:/consumers/consumer-group-xxx/owners/topic-xxx/x。

    当触发负载均衡后,原来的consumer会重新计算并释放已占用的partitions,此过程需要一定的处理时间,新来的consumer抢占该partitions时很有可能会失败。
    表1 参数说明

    名称

    作用

    默认值

    rebalance.max.retries

    Rebalance最大重试次数

    4

    rebalance.backoff.ms

    Rebalance每次重试间隔

    2000

    zookeeper.session.timeout.ms

    Zookeeper连接会话超时时间

    15000

    可以适当调大上述三个参数,可以参考如下数值:

    zookeeper.session.timeout.ms = 45000
    rebalance.max.retries = 10
    rebalance.backoff.ms = 5000

    参数设置应遵循:

    rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms

  4. 网络异常。

    在hosts文件中没有配置主机名和IP的对应关系,导致使用主机名进行访问时,无法获取信息。

  5. 在hosts文件中添加对应的主机名和IP的对应关系。

    • Linux
      图3 示例3
    • Windows
      图4 示例4