Sending and Receiving Normal Messages
This section describes how to send and receive normal messages and provides sample code. Normal messages can be sent in the synchronous or asynchronous mode.
- Synchronous transmission: After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
- Asynchronous transmission: After sending a message, the sender sends the next message without waiting for a response from the server.
Before sending and receiving normal messages, collect RocketMQ connection information by referring to Collecting Connection Information.
Preparing the Environment
- Run the following command to check whether Go has been installed:
go version
If the following information is displayed, Go has been installed:
go version go1.16.5 linux/amd64
If Go is not installed, download and install it.
- Add the following code to go.mod to add the dependency:
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.1 )
Synchronous Transmission
After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
The following code is an example. Replace the information in bold with the actual values.
package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "os" ) // implements a simple producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } msg := &primitive.Message{ Topic: "topic1", Body: []byte("Hello RocketMQ Go Client!"), } msg.WithTag("TagA") msg.WithKeys([]string{"KeyA"}) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- 192.168.0.1:8100: instance address and port.
- topic1: topic name.
Asynchronous Transmission
After sending a message, the sender sends the next message without waiting for a response from the server.
Asynchronous transmission requires the SendCallback method to be supported on the client. After sending a message, the sender sends the next message without waiting for a server response. The sender calls the SendCallback method to receive the server's response and then processes the response.
The following code is an example. Replace the information in bold with the actual values.
package main import ( "context" "fmt" "os" "sync" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // implements an async producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2)) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } var wg sync.WaitGroup wg.Add(1) callback := func(ctx context.Context, result *primitive.SendResult, e error) { if e != nil { fmt.Printf("receive message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", result.String()) } wg.Done() } message := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!")) err = p.SendAsync(context.Background(), callback, message) if err != nil { fmt.Printf("send message error: %s\n", err) wg.Done() } wg.Wait() err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- 192.168.0.1:8100: instance address and port.
- test: topic name.
Subscribing to Normal Messages
The following code is an example. Replace the information in bold with the actual values.
package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), ) err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } }
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- testGroup: consumer group name.
- 192.168.0.1:8100: instance address and port.
- test: topic name.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot