更新时间:2022-02-21 GMT+08:00

示例代码导读

DMS TCP Demo示例工程中包含DmsProducerDemo和DmsConsumerDemo两个示例,DmsProducerDemo用于展示生产消息流程,DmsConsumerDemo用于展示消费和确认消息流程。

生产消息

  1. 创建并启动DmsProducer。

    DmsProducer producer = new DmsProducerImpl();
    producer.start();

  2. 构建要发送的消息内容。

    List<DmsMessage> messages = new ArrayList<>();
    final int messageNum = 10;
    for (int i = 0; i < messageNum; i++)
    {
        DmsMessage record = new DmsMessage();
        record.setBody(("Hello DMS syn produce: " + i).getBytes());
        messages.add(record);
    }

  3. 发送消息。

    try
    {
        List<DmsProduceResult> result = producer.produce(queueId, messages);
        result.forEach(r ->
        {
            System.out.println(r.getState());
        });
    }
    catch (Throwable t)
    {
        t.printStackTrace();
    }

  4. 停止生产。

    producer.stop();

消费消息

  1. 创建并启动消费实例。

    DmsConsumer consumer = new DmsConsumerImpl();
    consumer.start();

  2. 消费消息。

    List<DmsConsumeResult> records = consumer.consume(queueId, groupId);

  3. 构建确认请求。

    DmsCommitRequest commitRequest = new DmsCommitRequest();
    List<DmsCommitItem> messages = new ArrayList<>();
    
    for (DmsConsumeResult record : records)
    {
        System.out.println(record.getHandler());
        DmsCommitItem commitItem = new DmsCommitItem();
        //确认时需要回填对应消息handler
        commitItem.setHandler(record.getHandler());
        //设置消息处理状态: true or false
        commitItem.setStatus(true);
        messages.add(commitItem);
    }
    //一次可确认多条消息
    commitRequest.setMessage(messages);

  4. 确认消息。

    DmsCommitResult commitResult = consumer.commit(queueId, groupId, commitRequest);

  5. 停止消费。

    consumer.stop();