Updated on 2024-12-25 GMT+08:00

Sending and Receiving Normal Messages

This section describes how to send and receive normal messages and provides sample code. Normal messages can be sent in the synchronous or asynchronous mode.

  • Synchronous transmission: After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
  • Asynchronous transmission: After sending a message, the sender sends the next message without waiting for a response from the server.

Before sending and receiving messages, collect RocketMQ connection information by referring to Collecting Connection Information.

Notes and Constraints

  • The gRPC protocol is only supported by RocketMQ v5.x but not v4.8.0.
  • To receive and send normal messages, ensure the topic message type is Normal before connecting a client to a RocketMQ instance of v5.x.

Preparing the Environment

  1. Run the following command to check whether Go has been installed:
    go version

    If the following information is displayed, Go has been installed:

    go version go1.16.5 linux/amd64

    If Go is not installed, download and install it.

  2. Add the following code to go.mod to add the dependency:
    module rocketmq-example-go
    
    go 1.13
    
    require (
    	github.com/apache/rocketmq-clients/golang/v5
    )

Synchronous Transmission

After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.

The following code is an example. Replace the information in bold with the actual values.

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    "github.com/apache/rocketmq-clients/golang"
    "github.com/apache/rocketmq-clients/golang/credentials"
)

const (
    Topic     = "topic01"
    Endpoint  = "192.168.xx.xx:8080"
    AccessKey = os.Getenv("ROCKETMQ_AK")  //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
    SecretKey = os.Getenv("ROCKETMQ_SK")
)

func main() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    golang.ResetLogger()
    producer, err := golang.NewProducer(&golang.Config{
        Endpoint: Endpoint,
        Credentials: &credentials.SessionCredentials{
            AccessKey:    AccessKey,
            AccessSecret: SecretKey,
        },
    },
        golang.WithTopics(Topic),
    )
    if err != nil {
        log.Fatal(err)
    }
    err = producer.Start()
    if err != nil {
        log.Fatal(err)
    }
    defer producer.GracefulStop()

    for i := 0; i < 10; i++ {
        msg := &golang.Message{
            Topic: Topic,
            Body:  []byte("this is a message : " + strconv.Itoa(i)),
        }
        // Set the message key and tag.
        msg.SetKeys("a", "b")
        msg.SetTag("ab")
        resp, err := producer.Send(context.TODO(), msg)
        if err != nil {
            log.Fatal(err)
        }
        for i := 0; i < len(resp); i++ {
            fmt.Printf("%#v\n", resp[i])
        }

        time.Sleep(time.Second * 1)
    }
}

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • Topic: Enter a topic name.
  • Endpoint: Enter the gRPC address or gRPC public address.
  • AccessKey: Enter the username if ACL was enabled during instance creation.
  • SecretKey: Enter the user key if ACL was enabled during instance creation.
  • SetKeys: Enter the message key.
  • SetTag: Enter the message tag.

Asynchronous Transmission

After sending a message, the sender sends the next message without waiting for a response from the server.

Asynchronous transmission requires the SendCallback method to be supported on the client. After sending a message, the sender sends the next message without waiting for a server response. The sender calls the SendCallback method to receive the server's response and then processes the response.

The following code is an example. Replace the information in bold with the actual values.

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    "github.com/apache/rocketmq-clients/golang"
    "github.com/apache/rocketmq-clients/golang/credentials"
)

const (
    Topic     = "topic01"
    Endpoint  = "192.168.xx.xx:8080"
    AccessKey = os.Getenv("ROCKETMQ_AK")  //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
    SecretKey = os.Getenv("ROCKETMQ_SK")
)

func main() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    golang.ResetLogger()
    producer, err := golang.NewProducer(&golang.Config{
        Endpoint: Endpoint,
        Credentials: &credentials.SessionCredentials{
            AccessKey:    AccessKey,
            AccessSecret: SecretKey,
        },
    },
        golang.WithTopics(Topic),
    )
    if err != nil {
        log.Fatal(err)
    }
    err = producer.Start()
    if err != nil {
        log.Fatal(err)
    }
    defer producer.GracefulStop()
    for i := 0; i < 10; i++ {
        msg := &golang.Message{
            Topic: Topic,
            Body:  []byte("this is a message : " + strconv.Itoa(i)),
        }
        // Set the message key and tag.
        msg.SetKeys("a", "b")
        msg.SetTag("ab")
        producer.SendAsync(context.TODO(), msg, func(ctx context.Context, resp []*golang.SendReceipt, err error) {
            if err != nil {
                log.Fatal(err)
            }
            for i := 0; i < len(resp); i++ {
                fmt.Printf("%#v\n", resp[i])
            }
        })

        time.Sleep(time.Second * 1)
    }
}

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • Topic: Enter a topic name.
  • Endpoint: Enter the gRPC address or gRPC public address.
  • AccessKey: Enter the username if ACL was enabled during instance creation.
  • SecretKey: Enter the user key if ACL was enabled during instance creation.
  • SetKeys: Enter the message key.
  • SetTag: Enter the message tag.

Subscribing to Normal Messages

The following code is an example. Replace the information in bold with the actual values.

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/apache/rocketmq-clients/golang"
    "github.com/apache/rocketmq-clients/golang/credentials"
)

const (
    Topic     = "topic01"
    GroupName = "groupname"
    Endpoint  = "192.168.xx.xx:8080"
    AccessKey = os.Getenv("ROCKETMQ_AK")  //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
    SecretKey = os.Getenv("ROCKETMQ_SK")
)

var (
    // Maximum duration to wait to receive a message request.
    awaitDuration = time.Second * 5
    // Maximum number of messages that can be received each time.
    maxMessageNum int32 = 16
    // Invisible duration when received messages are invisible to other consumers.
    invisibleDuration = time.Second * 20 
)

func main() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    golang.ResetLogger()
    simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
        Endpoint: Endpoint,
        Group:    GroupName,
        Credentials: &credentials.SessionCredentials{
            AccessKey:    AccessKey,
            AccessSecret: SecretKey,
        },
    },
        golang.WithAwaitDuration(awaitDuration),
        golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
            Topic: golang.SUB_ALL,
        }),
    )
    if err != nil {
        log.Fatal(err)
    }
    err = simpleConsumer.Start()
    if err != nil {
        log.Fatal(err)
    }
    defer simpleConsumer.GracefulStop()

    go func() {
        for {
            fmt.Println("start recevie message")
            mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
            if err != nil {
                fmt.Println(err)
            }
            for _, mv := range mvs {
                simpleConsumer.Ack(context.TODO(), mv)
                fmt.Println(mv)
            }
            fmt.Println("wait a moment")
            fmt.Println()
            time.Sleep(time.Second * 3)
        }
    }()

    time.Sleep(time.Minute)
}

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • Topic: Enter a topic name.
  • GroupName: Enter a consumer group name.
  • Endpoint: Enter the gRPC address or gRPC public address.
  • AccessKey: Enter the username if ACL was enabled during instance creation.
  • SecretKey: Enter the user key if ACL was enabled during instance creation.