Consumer初始化成功,但是无法从Kafka中获取指定Topic消息
问题背景与现象
使用MRS安装集群,主要安装ZooKeeper、Flume、Kafka、Storm、Spark。
使用Storm、Spark、Flume或者自己编写consumer代码来消费Kafka中指定Topic的消息时,发现消费不到任何数据。
可能原因
- Kafka服务异常。
- Consumer中ZooKeeper相关连接地址配置错误,导致无法消费。
- Consumer发生ConsumerRebalanceFailedException异常,导致无法消费。
- Consumer发生网络导致的ClosedChannelException异常,导致无法消费。
原因分析
Storm、Spark、Flume或者自定义Consumer代码可以都称为Consumer。
- 查看kafka服务状态:
- MRS Manager界面操作:登录MRS Manager,依次选择 "服务管理 > Kafka ,查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。
- FusionInsight Manager界面操作:登录FusionInsight Manager,选择“集群 > 待操作集群的名称 > 服务 > Kafka,
查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。
- 通过Kafka Client,判断是否可以正常消费数据。
假设客户端已经安装在/opt/client目录,test为需要消费的Topic名称, 192.168.234.231为ZooKeeper的IP地址。
cd /opt/client source bigdata_env kinit admin kafka-topics.sh --zookeeper 192.168.234.231:2181/kafka --describe --topic testkafka-console-consumer.sh --topic test --zookeeper 192.168.234.231:2181/kafka --from-beginning
当可以消费到数据时,表示集群服务正常。
- 查看Consumer相关配置,发现ZooKeeper连接地址错误。
- Flume
server.sources.Source02.type=org.apache.flume.source.kafka.KafkaSource server.sources.Source02.zookeeperConnect=192.168.234.231:2181 server.sources.Source02.topic = test server.sources.Source02.groupId = test_01
- Spark
val zkQuorum = "192.168.234.231:2181"
- Storm
BrokerHosts brokerHosts = new ZKHosts("192.168.234.231:2181");
- Consumer API
zookeeper.connect="192.168.234.231:2181"
MRS中Kafka在ZooKeeper存储的ZNode是以/kafka为根路径,有别于开源。Kafka对应的ZooKeeper连接配置为192.168.234.231:2181/kafka。
Consumer中配置为ZooKeeper连接配置为192.168.234.231:2181,导致无法正确获取Kafka中Topic相关信息。
解决方法参考1。
- Flume
- 查看Consumer相关日志,发现打印ConsumerRebalanceFailedException异常。
2016-02-03 15:55:32,557 | ERROR | [ZkClient-EventThread-75- 192.168.234.231:2181/kafka] | Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@34b41dfe] | org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:77) kafka.common.ConsumerRebalanceFailedException: pc-zjqbetl86-1454482884879-2ec95ed3 can't rebalance after 4 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:633) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:487) at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
通过异常信息,发现当前Consumer没有在指定的重试次数内完成Rebalance,使得Consumer没有被分配Kafka Topic-Partition,则无法消费消息。
解决方法参考3。
- 查看Consumer相关日志,发现打印Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:26,host:192-168-234-231,port:9092] failed错误信息和ClosedChannelException异常。
[2016-03-04 03:33:53,047] INFO Fetching metadata from broker id:26,host: 192-168-234-231,port:9092 with correlation id 0 for 1 topic(s) Set(test) (kafka.client.ClientUtils$) [2016-03-04 03:33:55,614] INFO Connected to 192-168-234-231:21005 for producing (kafka.producer.SyncProducer) [2016-03-04 03:33:55,614] INFO Disconnecting from 192-168-234-231:21005 (kafka.producer.SyncProducer) [2016-03-04 03:33:55,615] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:26,host: 192-168-234-231,port:21005] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2016-03-04 03:33:55,615] INFO Disconnecting from 192-168-234-231:21005 (kafka.producer.SyncProducer)
通过异常信息,发现当前Consumer无法从Kafka Broker 192-168-234-231节点获取元数据,导致无法连接正确的Broker获取消息。
- 检查网络是否存在问题,如果网络没有问题,检查是否配置主机和IP的对应关系
- Windows
打开“C:\Windows\System32\drivers\etc\hosts”。
图2 示例2
解决方法参考4。
- Windows
解决办法
- ZooKeeper连接地址配置错误。
- 修改Consumer配置中的ZooKeeper连接地址信息,保证和MRS相一致。
- Flume
server.sources.Source02.type=org.apache.flume.source.kafka.KafkaSource server.sources.Source02.zookeeperConnect=192.168.234.231:2181/kafka server.sources.Source02.topic = test server.sources.Source02.groupId = test_01
- Spark
val zkQuorum = "192.168.234.231:2181/kafka"
- Storm
BrokerHosts brokerHosts = new ZKHosts("192.168.234.231:2181/kafka");
- Consumer API
zookeeper.connect="192.168.234.231:2181/kafka"
- Flume
- Rebalance异常。
同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时消费多个partition数据,consumer端也会有负载均衡(consumer个数小于partitions数量时)。
consumer实际上是靠存储在zk中的临时节点来表明针对哪个topic的那个partition拥有读权限的。所在路径为:/consumers/consumer-group-xxx/owners/topic-xxx/x。
当触发负载均衡后,原来的consumer会重新计算并释放已占用的partitions,此过程需要一定的处理时间,新来的consumer抢占该partitions时很有可能会失败。表1 参数说明 名称
作用
默认值
rebalance.max.retries
Rebalance最大重试次数
4
rebalance.backoff.ms
Rebalance每次重试间隔
2000
zookeeper.session.timeout.ms
Zookeeper连接会话超时时间
15000
可以适当调大上述三个参数,可以参考如下数值:
zookeeper.session.timeout.ms = 45000 rebalance.max.retries = 10 rebalance.backoff.ms = 5000
参数设置应遵循:
rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
- 网络异常。
在hosts文件中没有配置主机名和IP的对应关系,导致使用主机名进行访问时,无法获取信息。
- 在hosts文件中添加对应的主机名和IP的对应关系。
- Linux
图3 示例3
- Windows
图4 示例4
- Linux