Updated on 2024-05-15 GMT+08:00

Sending and Receiving Normal Messages

This section describes how to send and receive normal messages and provides sample code. Normal messages can be sent in the synchronous or asynchronous mode.

  • Synchronous transmission: After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
  • Asynchronous transmission: After sending a message, the sender sends the next message without waiting for a response from the server.

Before sending and receiving normal messages, collect RocketMQ connection information by referring to Collecting Connection Information.

To receive and send normal messages, ensure the topic message type is Normal before connecting a client to a RocketMQ instance of version 5.x.

Preparing the Environment

  1. Run the following command to check whether Go has been installed:
    go version

    If the following information is displayed, Go has been installed:

    go version go1.16.5 linux/amd64

    If Go is not installed, download and install it.

  2. Add the following code to go.mod to add the dependency:
    module rocketmq-example-go
    
    go 1.13
    
    require (
    	github.com/apache/rocketmq-client-go/v2 v2.1.1
    )

Synchronous Transmission

After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.

The following code is an example. Replace the information in bold with the actual values.

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"os"
)

// implements a simple producer to send message.
func main() {
	p, _ := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
		producer.WithRetry(2),
	)
	err := p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	msg := &primitive.Message{
		Topic: "topic1",
		Body:  []byte("Hello RocketMQ Go Client!"),
	}
	msg.WithTag("TagA")
	msg.WithKeys([]string{"KeyA"})
	res, err := p.SendSync(context.Background(), msg)

	if err != nil {
		fmt.Printf("send message error: %s\n", err)
	} else {
		fmt.Printf("send message success: result=%s\n", res.String())
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}
}

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • 192.168.0.1:8100: instance address and port.
  • topic1: topic name.

Asynchronous Transmission

After sending a message, the sender sends the next message without waiting for a response from the server.

Asynchronous transmission requires the SendCallback method to be supported on the client. After sending a message, the sender sends the next message without waiting for a server response. The sender calls the SendCallback method to receive the server's response and then processes the response.

The following code is an example. Replace the information in bold with the actual values.

package main

import (
	"context"
	"fmt"
	"os"
	"sync"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

// implements an async producer to send message.
func main() {
	p, _ := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
		producer.WithRetry(2))

	err := p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	var wg sync.WaitGroup
	wg.Add(1)

	callback := func(ctx context.Context, result *primitive.SendResult, e error) {
		if e != nil {
			fmt.Printf("receive message error: %s\n", err)
		} else {
			fmt.Printf("send message success: result=%s\n", result.String())
		}
		wg.Done()
	}
	message := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!"))
	err = p.SendAsync(context.Background(), callback, message)
	if err != nil {
		fmt.Printf("send message error: %s\n", err)
		wg.Done()
	}

	wg.Wait()
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}
}

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • 192.168.0.1:8100: instance address and port.
  • test: topic name.

Subscribing to Normal Messages

The following code is an example. Replace the information in bold with the actual values.

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
	)
	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range msgs {
			fmt.Printf("subscribe callback: %v \n", msgs[i])
		}

		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("Shutdown Consumer error: %s", err.Error())
	}
}

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • testGroup: consumer group name.
  • 192.168.0.1:8100: instance address and port.
  • test: topic name.