更新时间:2024-06-11 GMT+08:00
GO SDK接入示例
本文介绍使用GO SDK通过AMQP接入华为云物联网平台,接收服务端订阅消息的示例。
开发环境要求
本示例使用的开发环境为Go 1.16及以上版本。
添加依赖
在go.mod中添加以下依赖。
require ( pack.ag/amqp v0.12.5 // 本示例使用v0.12.5,根据实际需要选择版本 )
代码示例
package main import ( "context" "crypto/tls" "fmt" "pack.ag/amqp" "time" ) type AmqpClient struct { Title string Host string AccessKey string AccessCode string InstanceId string QueueName string address string userName string password string client *amqp.Client session *amqp.Session receiver *amqp.Receiver } type MessageHandler interface { Handle(message *amqp.Message) } func (ac *AmqpClient) InitConnect() { if ac.QueueName == "" { ac.QueueName = "DefaultQueue" } ac.address = "amqps://" + ac.Host + ":5671" ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId) ac.password = ac.AccessCode } func (ac *AmqpClient) StartReceiveMessage(ctx context.Context, handler MessageHandler) { childCtx, _ := context.WithCancel(ctx) err := ac.generateReceiverWithRetry(childCtx) if nil != err { return } defer func() { _ = ac.receiver.Close(childCtx) _ = ac.session.Close(childCtx) _ = ac.client.Close() }() for { // 阻塞接受消息,如果ctx是background则不会被打断。 message, err := ac.receiver.Receive(ctx) if nil == err { go handler.Handle(message) _ = message.Accept() } else { fmt.Println("amqp receive data error: ", err) //如果是主动取消,则退出程序。 select { case <-childCtx.Done(): return default: } //非主动取消,则重新建立连接。 err := ac.generateReceiverWithRetry(childCtx) if nil != err { return } } } } func (ac *AmqpClient) generateReceiverWithRetry(ctx context.Context) error { // 退避重试,从10ms依次x2,直到20s。 duration := 10 * time.Millisecond maxDuration := 20000 * time.Millisecond times := 1 // 异常情况,退避重连。 for { select { case <-ctx.Done(): return amqp.ErrConnClosed default: } err := ac.generateReceiver() if nil != err { fmt.Println("amqp ac.generateReceiver error ", err) time.Sleep(duration) if duration < maxDuration { duration *= 2 } fmt.Println("amqp connect retry,times:", times, ",duration:", duration) times++ return nil } else { fmt.Println("amqp connect init success") return nil } } } // 由于包不可见,无法判断conn和session状态,重启连接获取。 func (ac *AmqpClient) generateReceiver() error { if ac.session != nil { receiver, err := ac.session.NewReceiver( amqp.LinkSourceAddress(ac.QueueName), amqp.LinkCredit(20), ) // 如果断网等行为发生,conn会关闭导致session建立失败,未关闭连接则建立成功。 if err == nil { ac.receiver = receiver return nil } } // 清理上一个连接。 if ac.client != nil { _ = ac.client.Close() } ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId) fmt.Println("[" + ac.Title + "] Dial... addr=[" + ac.address + "], username=[" + ac.userName + "], password=[" + ac.password + "]") client, err := amqp.Dial(ac.address, amqp.ConnSASLPlain(ac.userName, ac.password), amqp.ConnProperty("vhost", "default"), amqp.ConnServerHostname("default"), amqp.ConnTLSConfig(&tls.Config{InsecureSkipVerify: true, MaxVersion: tls.VersionTLS12, }), amqp.ConnConnectTimeout(8*time.Second)) if err != nil { fmt.Println("Dial", err) return err } ac.client = client session, err := client.NewSession() if err != nil { XDebug("Error: NewSession", err) return err } ac.session = session receiver, err := ac.session.NewReceiver( amqp.LinkTargetDurability(amqp.DurabilityUnsettledState), amqp.LinkSourceAddress(ac.QueueName), amqp.LinkCredit(100), ) if err != nil { XDebug("Error: NewReceiver", err) return err } ac.receiver = receiver return nil } func XDebug(s string, err error) { fmt.Println(s, err) } type CustomerMessageHandler struct { } func (c *CustomerMessageHandler) Handle(message *amqp.Message) { fmt.Println("AMQP收到消息:", message.Value) } func main() { // 以下参数配置请参考连接配置说明 // AMQP接入域名 amqpHost := "127.0.0.1" //接入凭证键值 amqpAccessKey := "your accessKey" // 接入凭证密钥 amqpAccessCode := "your accessCode" // 实例Id instanceId:= "your intanceId" // 订阅队列名称 amqpQueueName := "DefaultQueue" amqpClient := &AmqpClient{ Title: "test", Host: amqpHost, AccessKey: amqpAccessKey, AccessCode: amqpAccessCode, InstanceId: instanceId, QueueName: amqpQueueName, } handle := CustomerMessageHandler{} amqpClient.InitConnect() ctx := context.Background() amqpClient.StartReceiveMessage(ctx, &handle) }
父主题: 使用AMQP转发