Updated on 2023-11-30 GMT+08:00

Code of the Sample Project

The DMS TCP sample project contains two examples:

DmsProducerDemo: demonstrates the message production process. DmsConsumerDemo: demonstrates the message consumption and acknowledgment processes.

Producing Messages

  1. Create and start DmsProducer.

    DmsProducer producer = new DmsProducerImpl();
    producer.start();

  2. Construct the content of the message to be sent.

    List<DmsMessage> messages = new ArrayList<>();
    final int messageNum = 10;
    for (int i = 0; i < messageNum; i++)
    {
        DmsMessage record = new DmsMessage();
        record.setBody(("Hello DMS syn produce: " + i).getBytes());
        messages.add(record);
    }

  3. Send the message.

    try
    {
        List<DmsProduceResult> result = producer.produce(queueId, messages);
        result.forEach(r ->
        {
            System.out.println(r.getState());
        });
    }
    catch (Throwable t)
    {
        t.printStackTrace();
    }

  4. Stop message production.

    producer.stop();

Consuming Messages

  1. Create and start a consumer instance.

    DmsConsumer consumer = new DmsConsumerImpl();
    consumer.start();

  2. Consume messages.

    List<DmsConsumeResult> records = consumer.consume(queueId, groupId);

  3. Construct a request for acknowledging the message consumption.

    DmsCommitRequest commitRequest = new DmsCommitRequest();
    List<DmsCommitItem> messages = new ArrayList<>();
    
    for (DmsConsumeResult record : records)
    {
        System.out.println(record.getHandler());
        DmsCommitItem commitItem = new DmsCommitItem();
        //The corresponding message handler needs to be filled in during acknowledgment.
        commitItem.setHandler(record.getHandler());
        //Set the message handling status to true or false.
        commitItem.setStatus(true);
        messages.add(commitItem);
    }
    //Multiple messages can be acknowledged at a time.
    commitRequest.setMessage(messages);

  4. Acknowledge the message consumption.

    DmsCommitResult commitResult = consumer.commit(queueId, groupId, commitRequest);

  5. Stop message consumption.

    consumer.stop();