Help Center/
Distributed Message Service for RocketMQ/
Developer Guide/
Java (TCP)/
Controlling Traffic on Consumers
Updated on 2024-05-15 GMT+08:00
Controlling Traffic on Consumers
If consumers consume messages too fast for downstream services to keep up, system stability will be affected. This section provides sample code for controlling consumer traffic to ensure system stability.
package org.apache.rocketmq.example.simple; import java.util.List; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.RateLimiter; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); RateLimiter rateLimiter = RateLimiter.create(200); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (!rateLimiter.tryAcquire(msgs.size(),3, TimeUnit.SECONDS)) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
Parent topic: Java (TCP)
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot