Distributed Message Service for RocketMQ
Distributed Message Service for RocketMQ
- What's New
- Function Overview
- Product Bulletin
- Service Overview
- Getting Started
-
User Guide
- Process of Using RocketMQ
- Creating a User and Granting DMS for RocketMQ Permissions
- Buying a RocketMQ Instance
- Configuring a Topic
- Accessing an Instance
- Managing Messages
- Managing Consumer Groups
-
Managing Instances
- Viewing and Modifying Basic Information of a RocketMQ Instance
- Viewing Background Tasks of a RocketMQ Instance
- Configuring Tags for a RocketMQ Instance
- Exporting RocketMQ Instances
- Diagnosing a RocketMQ Instance
- Restarting Brokers of a RocketMQ Instance
- Deleting a RocketMQ Instance
- Configuring SSL of a RocketMQ Instance
- Modifying RocketMQ Specifications
- Migrating Metadata
- Viewing Monitoring Metrics and Configuring Alarms
- Viewing RocketMQ Audit Logs
- Best Practices
- Developer Guide
- API Reference
- SDK Reference
- FAQs
- Videos
On this page
Show all
Help Center/
Distributed Message Service for RocketMQ/
Developer Guide/
Java (TCP)/
Controlling Traffic on Consumers
Controlling Traffic on Consumers
Updated on 2024-12-25 GMT+08:00
If consumers consume messages too fast for downstream services to keep up, system stability will be affected. This section provides sample code for controlling consumer traffic to ensure system stability.
Preparing the Environment
You can connect an open-source Java client to DMS for RocketMQ. The recommended Java client version is 4.9.8.
Using Maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.8</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.8</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>29.0-jre</version> </dependency>
Sample Code
package org.apache.rocketmq.example.simple; import java.util.List; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.RateLimiter; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Enter the address. consumer.setNamesrvAddr("192.168.0.1:8100"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); RateLimiter rateLimiter = RateLimiter.create(200); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (!rateLimiter.tryAcquire(msgs.size(),3, TimeUnit.SECONDS)) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
Parent topic: Java (TCP)
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.
The system is busy. Please try again later.