更新时间:2024-06-11 GMT+08:00

GO SDK接入示例

本文介绍使用GO SDK通过AMQP接入华为云物联网平台,接收服务端订阅消息的示例。

开发环境要求

本示例使用的开发环境为Go 1.16及以上版本。

添加依赖

在go.mod中添加以下依赖。

require (
    pack.ag/amqp v0.12.5 // 本示例使用v0.12.5,根据实际需要选择版本
)

代码示例

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"pack.ag/amqp"
	"time"
)

type AmqpClient struct {
	Title      string
	Host       string
	AccessKey  string
	AccessCode string
        InstanceId string
	QueueName  string

	address  string
	userName string
	password string

	client   *amqp.Client
	session  *amqp.Session
	receiver *amqp.Receiver
}

type MessageHandler interface {
	Handle(message *amqp.Message)
}

func (ac *AmqpClient) InitConnect() {
	if ac.QueueName == "" {
		ac.QueueName = "DefaultQueue"
	}
	ac.address = "amqps://" + ac.Host + ":5671"
	ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId)
	ac.password = ac.AccessCode
}

func (ac *AmqpClient) StartReceiveMessage(ctx context.Context, handler MessageHandler) {
	childCtx, _ := context.WithCancel(ctx)
	err := ac.generateReceiverWithRetry(childCtx)
	if nil != err {
		return
	}
	defer func() {
		_ = ac.receiver.Close(childCtx)
		_ = ac.session.Close(childCtx)
		_ = ac.client.Close()
	}()

	for {
		// 阻塞接受消息,如果ctx是background则不会被打断。
		message, err := ac.receiver.Receive(ctx)
		if nil == err {
			go handler.Handle(message)
			_ = message.Accept()
		} else {
			fmt.Println("amqp receive data error: ", err)

			//如果是主动取消,则退出程序。
			select {
			case <-childCtx.Done():
				return
			default:
			}

			//非主动取消,则重新建立连接。
			err := ac.generateReceiverWithRetry(childCtx)
			if nil != err {
				return
			}
		}
	}
}

func (ac *AmqpClient) generateReceiverWithRetry(ctx context.Context) error {
	// 退避重试,从10ms依次x2,直到20s。
	duration := 10 * time.Millisecond
	maxDuration := 20000 * time.Millisecond
	times := 1

	// 异常情况,退避重连。
	for {
		select {
		case <-ctx.Done():
			return amqp.ErrConnClosed
		default:
		}

		err := ac.generateReceiver()
		if nil != err {
			fmt.Println("amqp ac.generateReceiver error ", err)
			time.Sleep(duration)
			if duration < maxDuration {
				duration *= 2
			}
			fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
			times++
			return nil
		} else {
			fmt.Println("amqp connect init success")
			return nil
		}
	}
}

// 由于包不可见,无法判断conn和session状态,重启连接获取。
func (ac *AmqpClient) generateReceiver() error {

	if ac.session != nil {
		receiver, err := ac.session.NewReceiver(
			amqp.LinkSourceAddress(ac.QueueName),
			amqp.LinkCredit(20),
		)
		// 如果断网等行为发生,conn会关闭导致session建立失败,未关闭连接则建立成功。
		if err == nil {
			ac.receiver = receiver
			return nil
		}
	}

	// 清理上一个连接。
	if ac.client != nil {
		_ = ac.client.Close()
	}
        ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId)
	fmt.Println("[" + ac.Title + "] Dial... addr=[" + ac.address + "], username=[" + ac.userName + "], password=[" + ac.password + "]")
	client, err := amqp.Dial(ac.address,
		amqp.ConnSASLPlain(ac.userName, ac.password),
		amqp.ConnProperty("vhost", "default"),
		amqp.ConnServerHostname("default"),
		amqp.ConnTLSConfig(&tls.Config{InsecureSkipVerify: true,
			MaxVersion: tls.VersionTLS12,
		}),
		amqp.ConnConnectTimeout(8*time.Second))
	if err != nil {
		fmt.Println("Dial", err)
		return err
	}
	ac.client = client
	session, err := client.NewSession()
	if err != nil {
		XDebug("Error: NewSession", err)
		return err
	}
	ac.session = session

	receiver, err := ac.session.NewReceiver(
		amqp.LinkTargetDurability(amqp.DurabilityUnsettledState),
		amqp.LinkSourceAddress(ac.QueueName),
		amqp.LinkCredit(100),
	)
	if err != nil {
		XDebug("Error: NewReceiver", err)
		return err
	}
	ac.receiver = receiver

	return nil
}

func XDebug(s string, err error) {
	fmt.Println(s, err)
}

type CustomerMessageHandler struct {
}

func (c *CustomerMessageHandler) Handle(message *amqp.Message) {
	fmt.Println("AMQP收到消息:", message.Value)
}

func main() {
	// 以下参数配置请参考连接配置说明
        // AMQP接入域名
	amqpHost := "127.0.0.1"

	//接入凭证键值
	amqpAccessKey := "your accessKey"

	// 接入凭证密钥
	amqpAccessCode := "your accessCode"

        // 实例Id
	instanceId:= "your intanceId"

	// 订阅队列名称
	amqpQueueName := "DefaultQueue"

	amqpClient := &AmqpClient{
		Title:      "test",
		Host:       amqpHost,
		AccessKey:  amqpAccessKey,
		AccessCode: amqpAccessCode,
                InstanceId: instanceId,
		QueueName:  amqpQueueName,
	}

	handle := CustomerMessageHandler{}
	amqpClient.InitConnect()
	ctx := context.Background()
	amqpClient.StartReceiveMessage(ctx, &handle)
}