更新时间:2024-05-15 GMT+08:00

收发普通消息

本章节介绍普通消息的收发方法和示例代码。普通消息发送方式分为同步发送和异步发送。

  • 同步发送:消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。
  • 异步发送:消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息。

收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

客户端连接RocketMQ实例5.x版本收发普通消息前,需要确保Topic的消息类型为“普通”。

准备环境

  1. 执行以下命令,检查是否已安装Go。
    go version

    返回如下回显时,说明Go已经安装。

    go version go1.16.5 linux/amd64

    如果未安装Go,请下载并安装

  2. 在“go.mod”中增加以下代码,添加依赖。
    module rocketmq-example-go
    
    go 1.13
    
    require (
    	github.com/apache/rocketmq-clients/golang/v5
    )

同步发送

同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    "github.com/apache/rocketmq-clients/golang"
    "github.com/apache/rocketmq-clients/golang/credentials"
)

const (
    Topic     = "topic01"
    Endpoint  = "192.168.xx.xx:8080"
    AccessKey = os.Getenv("ROCKETMQ_AK")  //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
    SecretKey = os.Getenv("ROCKETMQ_SK")
)

func main() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    golang.ResetLogger()
    producer, err := golang.NewProducer(&golang.Config{
        Endpoint: Endpoint,
        Credentials: &credentials.SessionCredentials{
            AccessKey:    AccessKey,
            AccessSecret: SecretKey,
        },
    },
        golang.WithTopics(Topic),
    )
    if err != nil {
        log.Fatal(err)
    }
    err = producer.Start()
    if err != nil {
        log.Fatal(err)
    }
    defer producer.GracefulStop()

    for i := 0; i < 10; i++ {
        msg := &golang.Message{
            Topic: Topic,
            Body:  []byte("this is a message : " + strconv.Itoa(i)),
        }
        // 设置消息的Key和Tag
        msg.SetKeys("a", "b")
        msg.SetTag("ab")
        resp, err := producer.Send(context.TODO(), msg)
        if err != nil {
            log.Fatal(err)
        }
        for i := 0; i < len(resp); i++ {
            fmt.Printf("%#v\n", resp[i])
        }

        time.Sleep(time.Second * 1)
    }
}

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • Topic:输入Topic名称。
  • Endpoint:输入grpc连接地址/grpc公网连接地址。
  • AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
  • SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
  • SetKeys:输入消息的Key。
  • SetTag:输入消息的Tag。

异步发送

异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

使用异步发送需要客户端实现异步发送回调接口(SendCallback)。即消息发送方在发送了一条消息后,不需要等待服务端响应接着发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    "github.com/apache/rocketmq-clients/golang"
    "github.com/apache/rocketmq-clients/golang/credentials"
)

const (
    Topic     = "topic01"
    Endpoint  = "192.168.xx.xx:8080"
    AccessKey = os.Getenv("ROCKETMQ_AK")  //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
    SecretKey = os.Getenv("ROCKETMQ_SK")
)

func main() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    golang.ResetLogger()
    producer, err := golang.NewProducer(&golang.Config{
        Endpoint: Endpoint,
        Credentials: &credentials.SessionCredentials{
            AccessKey:    AccessKey,
            AccessSecret: SecretKey,
        },
    },
        golang.WithTopics(Topic),
    )
    if err != nil {
        log.Fatal(err)
    }
    err = producer.Start()
    if err != nil {
        log.Fatal(err)
    }
    defer producer.GracefulStop()
    for i := 0; i < 10; i++ {
        msg := &golang.Message{
            Topic: Topic,
            Body:  []byte("this is a message : " + strconv.Itoa(i)),
        }
        // 设置消息的Key和Tag
        msg.SetKeys("a", "b")
        msg.SetTag("ab")
        producer.SendAsync(context.TODO(), msg, func(ctx context.Context, resp []*golang.SendReceipt, err error) {
            if err != nil {
                log.Fatal(err)
            }
            for i := 0; i < len(resp); i++ {
                fmt.Printf("%#v\n", resp[i])
            }
        })

        time.Sleep(time.Second * 1)
    }
}

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • Topic:输入Topic名称。
  • Endpoint:输入grpc连接地址/grpc公网连接地址。
  • AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
  • SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
  • SetKeys:输入消息的Key。
  • SetTag:输入消息的Tag。

订阅普通消息

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/apache/rocketmq-clients/golang"
    "github.com/apache/rocketmq-clients/golang/credentials"
)

const (
    Topic     = "topic01"
    GroupName = "groupname"
    Endpoint  = "192.168.xx.xx:8080"
    AccessKey = os.Getenv("ROCKETMQ_AK")  //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
    SecretKey = os.Getenv("ROCKETMQ_SK")
)

var (
    // 接收消息请求的最大等待时间
    awaitDuration = time.Second * 5
    // 每次能接收的最大消息数
    maxMessageNum int32 = 16
    // 消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。
    invisibleDuration = time.Second * 20 
)

func main() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    golang.ResetLogger()
    simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
        Endpoint: Endpoint,
        Group:    GroupName,
        Credentials: &credentials.SessionCredentials{
            AccessKey:    AccessKey,
            AccessSecret: SecretKey,
        },
    },
        golang.WithAwaitDuration(awaitDuration),
        golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
            Topic: golang.SUB_ALL,
        }),
    )
    if err != nil {
        log.Fatal(err)
    }
    err = simpleConsumer.Start()
    if err != nil {
        log.Fatal(err)
    }
    defer simpleConsumer.GracefulStop()

    go func() {
        for {
            fmt.Println("start recevie message")
            mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
            if err != nil {
                fmt.Println(err)
            }
            for _, mv := range mvs {
                simpleConsumer.Ack(context.TODO(), mv)
                fmt.Println(mv)
            }
            fmt.Println("wait a moment")
            fmt.Println()
            time.Sleep(time.Second * 3)
        }
    }()

    time.Sleep(time.Minute)
}

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • Topic:输入Topic名称。
  • GroupName:输入消费组名称。
  • Endpoint:输入grpc连接地址/grpc公网连接地址。
  • AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
  • SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。