更新时间:2022-02-21 GMT+08:00
示例代码导读
DMS TCP Demo示例工程中包含DmsProducerDemo和DmsConsumerDemo两个示例,DmsProducerDemo用于展示生产消息流程,DmsConsumerDemo用于展示消费和确认消息流程。
生产消息
- 创建并启动DmsProducer。
DmsProducer producer = new DmsProducerImpl(); producer.start();
- 构建要发送的消息内容。
List<DmsMessage> messages = new ArrayList<>(); final int messageNum = 10; for (int i = 0; i < messageNum; i++) { DmsMessage record = new DmsMessage(); record.setBody(("Hello DMS syn produce: " + i).getBytes()); messages.add(record); }
- 发送消息。
try { List<DmsProduceResult> result = producer.produce(queueId, messages); result.forEach(r -> { System.out.println(r.getState()); }); } catch (Throwable t) { t.printStackTrace(); }
- 停止生产。
producer.stop();
消费消息
- 创建并启动消费实例。
DmsConsumer consumer = new DmsConsumerImpl(); consumer.start();
- 消费消息。
List<DmsConsumeResult> records = consumer.consume(queueId, groupId);
- 构建确认请求。
DmsCommitRequest commitRequest = new DmsCommitRequest(); List<DmsCommitItem> messages = new ArrayList<>(); for (DmsConsumeResult record : records) { System.out.println(record.getHandler()); DmsCommitItem commitItem = new DmsCommitItem(); //确认时需要回填对应消息handler commitItem.setHandler(record.getHandler()); //设置消息处理状态: true or false commitItem.setStatus(true); messages.add(commitItem); } //一次可确认多条消息 commitRequest.setMessage(messages);
- 确认消息。
DmsCommitResult commitResult = consumer.commit(queueId, groupId, commitRequest);
- 停止消费。
consumer.stop();
父主题: 普通队列开发指南